diff options
author | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
commit | 5df61c3cdc4499197e420a76b25b942dce0f3ccc (patch) | |
tree | 87da2fd65f79c28f4b97c4619f95b07797107d82 | |
parent | 0831ce1c616296196eff82065da469156b4def82 (diff) | |
download | mariadb-git-5df61c3cdc4499197e420a76b25b942dce0f3ccc.tar.gz |
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work.
The main change is the new replication code - now we have two slave threads
SQL thread and I/O thread. I have also re-written a lot of the code to
prepare for multi-master implementation.
I also documented IO_CACHE quite extensively and to some extend, THD class.
Makefile.am:
moved tags target script into a separate file
include/my_sys.h:
fixes in IO_CACHE for SEQ_READ_APPEND + some documentation
libmysqld/lib_sql.cc:
updated replication locks, but now I see I did it wrong and it won't compile. Will fix
before the push.
mysql-test/r/rpl000014.result:
test result update
mysql-test/r/rpl000015.result:
test result update
mysql-test/r/rpl000016.result:
test result update
mysql-test/r/rpl_log.result:
test result update
mysql-test/t/rpl000016-slave.sh:
remove relay logs
mysql-test/t/rpl000017-slave.sh:
remove relay logs
mysql-test/t/rpl_log.test:
updated test
mysys/mf_iocache.c:
IO_CACHE updates to make replication work
mysys/mf_iocache2.c:
IO_CACHE update to make replication work
mysys/thr_mutex.c:
cosmetic change
sql/item_func.cc:
new replication code
sql/lex.h:
new replication
sql/log.cc:
new replication
sql/log_event.cc:
new replication
sql/log_event.h:
new replication
sql/mini_client.cc:
new replication
sql/mini_client.h:
new replication
sql/mysql_priv.h:
new replication
sql/mysqld.cc:
new replication
sql/repl_failsafe.cc:
new replication
sql/slave.cc:
new replication
sql/slave.h:
new replication
sql/sql_class.cc:
new replication
sql/sql_class.h:
new replication
sql/sql_lex.h:
new replication
sql/sql_parse.cc:
new replication
sql/sql_repl.cc:
new replication
sql/sql_repl.h:
new replication
sql/sql_show.cc:
new replication
sql/sql_yacc.yy:
new replication
sql/stacktrace.c:
more robust stack tracing
sql/structs.h:
new replication code
BitKeeper/etc/ignore:
Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
39 files changed, 2467 insertions, 1035 deletions
diff --git a/.bzrignore b/.bzrignore index b741024bb95..41f886b0102 100644 --- a/.bzrignore +++ b/.bzrignore @@ -446,3 +446,9 @@ vio/test-sslclient vio/test-sslserver vio/viotest-ssl sql-bench/test-transactions +mysql-test/r/rpl000002.eval +mysql-test/r/rpl000014.eval +mysql-test/r/rpl000015.eval +mysql-test/r/rpl000016.eval +mysql-test/r/slave-running.eval +mysql-test/r/slave-stopped.eval diff --git a/Makefile.am b/Makefile.am index 7257617b8e6..9b8381e63d5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -74,14 +74,11 @@ bin-dist: all $(top_builddir)/scripts/make_binary_distribution tags: - rm -f TAGS - find -not -path \*SCCS\* -and \ - \( -name \*.cc -or -name \*.h -or -name \*.yy -or -name \*.c \) \ - -print -exec etags -o TAGS --append {} \; - + support-files/build-tags .PHONY: init-db bin-dist # Test installation test: cd mysql-test ; ./mysql-test-run + diff --git a/include/my_sys.h b/include/my_sys.h index 2a5dfb4d184..3fd939154d2 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -296,35 +296,105 @@ typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); typedef struct st_io_cache /* Used when cacheing files */ { + /* pos_in_file is offset in file corresponding to the first byte of + byte* buffer. end_of_file is the offset of end of file for READ_CACHE + and WRITE_CACHE. For SEQ_READ_APPEND it the maximum of the actual + end of file and the position represented by read_end. + */ my_off_t pos_in_file,end_of_file; + /* read_pos points to current read position in the buffer + read_end is the non-inclusive boundary in the buffer for the currently + valid read area + buffer is the read buffer + not sure about request_pos except that it is used in async_io + */ byte *read_pos,*read_end,*buffer,*request_pos; + /* write_buffer is used only in WRITE caches and in SEQ_READ_APPEND to + buffer writes + append_read_pos is only used in SEQ_READ_APPEND, and points to the + current read position in the write buffer. Note that reads in + SEQ_READ_APPEND caches can happen from both read buffer (byte* buffer), + and write buffer (byte* write_buffer). + write_pos points to current write position in the write buffer and + write_end is the non-inclusive boundary of the valid write area + */ byte *write_buffer, *append_read_pos, *write_pos, *write_end; + /* current_pos and current_end are convenience variables used by + my_b_tell() and other routines that need to know the current offset + current_pos points to &write_pos, and current_end to &write_end in a + WRITE_CACHE, and &read_pos and &read_end respectively otherwise + */ byte **current_pos, **current_end; -/* The lock is for append buffer used in READ_APPEND cache */ +/* The lock is for append buffer used in SEQ_READ_APPEND cache */ #ifdef THREAD pthread_mutex_t append_buffer_lock; /* need mutex copying from append buffer to read buffer */ -#endif +#endif + /* a caller will use my_b_read() macro to read from the cache + if the data is already in cache, it will be simply copied with + memcpy() and internal variables will be accordinging updated with + no functions invoked. However, if the data is not fully in the cache, + my_b_read() will call read_function to fetch the data. read_function + must never be invoked directly + */ int (*read_function)(struct st_io_cache *,byte *,uint); + /* same idea as in the case of read_function, except my_b_write() needs to + be replaced with my_b_append() for a SEQ_READ_APPEND cache + */ int (*write_function)(struct st_io_cache *,const byte *,uint); + /* specifies the type of the cache. Depending on the type of the cache + certain operations might not be available and yield unpredicatable + results. Details to be documented later + */ enum cache_type type; - /* callbacks when the actual read I/O happens */ + /* callbacks when the actual read I/O happens. These were added and + are currently used for binary logging of LOAD DATA INFILE - when a + block is read from the file, we create a block create/append event, and + when IO_CACHE is closed, we create an end event. These functions could, + of course be used for other things + */ IO_CACHE_CALLBACK pre_read; IO_CACHE_CALLBACK post_read; IO_CACHE_CALLBACK pre_close; void* arg; /* for use by pre/post_read */ char *file_name; /* if used with 'open_cached_file' */ char *dir,*prefix; - File file; + File file; /* file descriptor */ + /* seek_not_done is set by my_b_seek() to inform the upcoming read/write + operation that a seek needs to be preformed prior to the actual I/O + error is 0 if the cache operation was successful, -1 if there was a + "hard" error, and the actual number of I/O-ed bytes if the read/write was + partial + */ int seek_not_done,error; + /* buffer_length is the size of memory allocated for buffer or write_buffer + read_length is the same as buffer_length except when we use async io + not sure why we need it + */ uint buffer_length,read_length; myf myflags; /* Flags used to my_read/my_write */ /* + alloced_buffer is 1 if the buffer was allocated by init_io_cache() and + 0 if it was supplied by the user Currently READ_NET is the only one that will use a buffer allocated somewhere else */ my_bool alloced_buffer; + /* init_count is incremented every time we call init_io_cache() + It is not reset in end_io_cache(). This variable + was introduced for slave relay logs - RELAY_LOG_INFO stores a pointer + to IO_CACHE that could in some cases refer to the IO_CACHE of the + currently active relay log. The IO_CACHE then could be closed, + re-opened and start pointing to a different log file. In that case, + we could not know reliably if this happened without init_count + one must be careful with bzero() prior to the subsequent init_io_cache() + call + */ + int init_count; #ifdef HAVE_AIOWAIT + /* as inidicated by ifdef, this is for async I/O, we will have + Sinisa comment this some time + */ uint inited; my_off_t aio_read_pos; my_aio_result aio_result; @@ -369,6 +439,8 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); #define my_b_tell(info) ((info)->pos_in_file + \ (uint) (*(info)->current_pos - (info)->request_pos)) +#define my_b_append_tell(info) ((info)->end_of_file + \ + (uint) ((info)->write_pos - (info)->write_buffer)) #define my_b_bytes_in_cache(info) (uint) (*(info)->current_end - \ *(info)->current_pos) diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc index d241ceaada0..f22cc1b6101 100644 --- a/libmysqld/lib_sql.cc +++ b/libmysqld/lib_sql.cc @@ -396,8 +396,8 @@ int STDCALL mysql_server_init(int argc, char **argv, char **groups) (void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST); - (void) pthread_mutex_init(&LOCK_binlog_update, MY_MUTEX_INIT_FAST); // QQ NOT USED - (void) pthread_mutex_init(&LOCK_slave, MY_MUTEX_INIT_FAST); + (void) pthread_mutex_init(&LOCK_slave_io, MY_MUTEX_INIT_FAST); + (void) pthread_mutex_init(&LOCK_slave_sql, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_server_id, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST); (void) pthread_cond_init(&COND_thread_count,NULL); @@ -406,8 +406,11 @@ int STDCALL mysql_server_init(int argc, char **argv, char **groups) (void) pthread_cond_init(&COND_flush_thread_cache,NULL); (void) pthread_cond_init(&COND_manager,NULL); (void) pthread_cond_init(&COND_binlog_update, NULL); - (void) pthread_cond_init(&COND_slave_stopped, NULL); - (void) pthread_cond_init(&COND_slave_start, NULL); + (void) pthread_cond_init(&COND_slave_log_update, NULL); + (void) pthread_cond_init(&COND_slave_sql_stop, NULL); + (void) pthread_cond_init(&COND_slave_sql_start, NULL); + (void) pthread_cond_init(&COND_slave_sql_stop, NULL); + (void) pthread_cond_init(&COND_slave_sql_start, NULL); if (set_default_charset_by_name(default_charset, MYF(MY_WME))) { diff --git a/mysql-test/r/rpl000014.result b/mysql-test/r/rpl000014.result index e5130792d78..a0646b9c39f 100644 --- a/mysql-test/r/rpl000014.result +++ b/mysql-test/r/rpl000014.result @@ -7,22 +7,22 @@ show master status; File Position Binlog_do_db Binlog_ignore_db master-bin.001 79 show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root $MASTER_MYPORT 1 master-bin.001 79 Yes 0 0 1 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 1 master-bin.001 79 mysql-relay-bin.002 120 master-bin.001 Yes Yes 0 0 79 change master to master_log_pos=73; slave stop; change master to master_log_pos=73; show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root $MASTER_MYPORT 1 master-bin.001 73 No 0 0 1 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 1 master-bin.001 73 mysql-relay-bin.001 4 master-bin.001 No No 0 0 73 slave start; show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root $MASTER_MYPORT 1 master-bin.001 73 Yes 0 0 1 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 1 master-bin.001 73 mysql-relay-bin.001 4 master-bin.001 Yes Yes 0 0 73 change master to master_log_pos=173; show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root $MASTER_MYPORT 1 master-bin.001 173 Yes 0 0 1 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 1 master-bin.001 173 mysql-relay-bin.001 4 master-bin.001 Yes Yes 0 0 173 show master status; File Position Binlog_do_db Binlog_ignore_db master-bin.001 79 diff --git a/mysql-test/r/rpl000015.result b/mysql-test/r/rpl000015.result index e3bde4a5abd..571c4c0b1e3 100644 --- a/mysql-test/r/rpl000015.result +++ b/mysql-test/r/rpl000015.result @@ -4,21 +4,21 @@ File Position Binlog_do_db Binlog_ignore_db master-bin.001 79 reset slave; show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq - 0 0 0 No 0 0 0 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos + 0 0 0 0 No No 0 0 0 change master to master_host='127.0.0.1'; show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 test 3306 60 4 No 0 0 0 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 test 3306 60 4 mysql-relay-bin.001 4 No No 0 0 0 change master to master_host='127.0.0.1',master_user='root', master_password='',master_port=9306; show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root 9306 60 4 No 0 0 0 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 60 4 mysql-relay-bin.001 4 No No 0 0 0 slave start; show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root 9306 60 master-bin.001 79 Yes 0 0 1 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 60 master-bin.001 79 mysql-relay-bin.001 120 master-bin.001 Yes Yes 0 0 79 drop table if exists t1; create table t1 (n int); insert into t1 values (10),(45),(90); diff --git a/mysql-test/r/rpl000016.result b/mysql-test/r/rpl000016.result index 405e0302161..e3510ec2d94 100644 --- a/mysql-test/r/rpl000016.result +++ b/mysql-test/r/rpl000016.result @@ -14,8 +14,8 @@ drop table if exists t1; create table t1 (s text); insert into t1 values('Could not break slave'),('Tried hard'); show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root 9306 60 master-bin.001 234 Yes 0 0 3 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 60 master-bin.001 234 mysql-relay-bin.001 275 master-bin.001 Yes Yes 0 0 234 select * from t1; s Could not break slave @@ -42,8 +42,8 @@ Log_name master-bin.003 insert into t2 values (65); show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root 9306 60 master-bin.003 155 Yes 0 0 3 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 60 master-bin.003 155 mysql-relay-bin.001 793 master-bin.003 Yes Yes 0 0 155 select * from t2; m 34 @@ -65,8 +65,8 @@ master-bin.006 445 slave stop; slave start; show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root 9306 60 master-bin.006 445 Yes 0 0 7 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 60 master-bin.006 445 mysql-relay-bin.004 1376 master-bin.006 Yes Yes 0 0 445 lock tables t3 read; select count(*) from t3 where n >= 4; count(*) diff --git a/mysql-test/r/rpl_log.result b/mysql-test/r/rpl_log.result index 27a19f49874..4e6588d39f2 100644 --- a/mysql-test/r/rpl_log.result +++ b/mysql-test/r/rpl_log.result @@ -15,48 +15,48 @@ create table t1 (word char(20) not null); load data infile '../../std_data/words.dat' into table t1; drop table t1; show binlog events; -Log_name Pos Event_type Server_id Log_seq Info -master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 -master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) -master-bin.001 172 Intvar 1 3 INSERT_ID=1 -master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) -master-bin.001 263 Query 1 5 use test; drop table t1 -master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) -master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81 -master-bin.001 556 Exec_load 1 8 ;file_id=1 -master-bin.001 579 Query 1 9 use test; drop table t1 +Log_name Pos Event_type Server_id Orig_log_pos Info +master-bin.001 4 Start 1 4 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 3 +master-bin.001 79 Query 1 79 use test; create table t1(n int not null auto_increment primary key) +master-bin.001 172 Intvar 1 172 INSERT_ID=1 +master-bin.001 200 Query 1 200 use test; insert into t1 values (NULL) +master-bin.001 263 Query 1 263 use test; drop table t1 +master-bin.001 311 Query 1 311 use test; create table t1 (word char(20) not null) +master-bin.001 386 Create_file 1 386 db=test;table=t1;file_id=1;block_len=81 +master-bin.001 556 Exec_load 1 556 ;file_id=1 +master-bin.001 579 Query 1 579 use test; drop table t1 show binlog events from 79 limit 1; -Log_name Pos Event_type Server_id Log_seq Info -master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) +Log_name Pos Event_type Server_id Orig_log_pos Info +master-bin.001 79 Query 1 79 use test; create table t1(n int not null auto_increment primary key) show binlog events from 79 limit 2; -Log_name Pos Event_type Server_id Log_seq Info -master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) -master-bin.001 172 Intvar 1 3 INSERT_ID=1 +Log_name Pos Event_type Server_id Orig_log_pos Info +master-bin.001 79 Query 1 79 use test; create table t1(n int not null auto_increment primary key) +master-bin.001 172 Intvar 1 172 INSERT_ID=1 show binlog events from 79 limit 2,1; -Log_name Pos Event_type Server_id Log_seq Info -master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) +Log_name Pos Event_type Server_id Orig_log_pos Info +master-bin.001 200 Query 1 200 use test; insert into t1 values (NULL) flush logs; create table t1 (n int); insert into t1 values (1); drop table t1; show binlog events; -Log_name Pos Event_type Server_id Log_seq Info -master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 -master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) -master-bin.001 172 Intvar 1 3 INSERT_ID=1 -master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) -master-bin.001 263 Query 1 5 use test; drop table t1 -master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) -master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81 -master-bin.001 556 Exec_load 1 8 ;file_id=1 -master-bin.001 579 Query 1 9 use test; drop table t1 -master-bin.001 627 Rotate 1 10 master-bin.002;pos=4 -master-bin.001 668 Stop 1 11 +Log_name Pos Event_type Server_id Orig_log_pos Info +master-bin.001 4 Start 1 4 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 3 +master-bin.001 79 Query 1 79 use test; create table t1(n int not null auto_increment primary key) +master-bin.001 172 Intvar 1 172 INSERT_ID=1 +master-bin.001 200 Query 1 200 use test; insert into t1 values (NULL) +master-bin.001 263 Query 1 263 use test; drop table t1 +master-bin.001 311 Query 1 311 use test; create table t1 (word char(20) not null) +master-bin.001 386 Create_file 1 386 db=test;table=t1;file_id=1;block_len=81 +master-bin.001 556 Exec_load 1 556 ;file_id=1 +master-bin.001 579 Query 1 579 use test; drop table t1 +master-bin.001 627 Rotate 1 627 master-bin.002;pos=4 +master-bin.001 668 Stop 1 668 show binlog events in 'master-bin.002'; -Log_name Pos Event_type Server_id Log_seq Info -master-bin.002 4 Query 1 1 use test; create table t1 (n int) -master-bin.002 62 Query 1 2 use test; insert into t1 values (1) -master-bin.002 122 Query 1 3 use test; drop table t1 +Log_name Pos Event_type Server_id Orig_log_pos Info +master-bin.002 4 Query 1 4 use test; create table t1 (n int) +master-bin.002 62 Query 1 62 use test; insert into t1 values (1) +master-bin.002 122 Query 1 122 use test; drop table t1 show master logs; Log_name master-bin.001 @@ -67,45 +67,45 @@ Log_name slave-bin.001 slave-bin.002 show binlog events in 'slave-bin.001' from 4; -Log_name Pos Event_type Server_id Log_seq Info -slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 -slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4 -slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key) -slave-bin.001 225 Intvar 1 3 INSERT_ID=1 -slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL) -slave-bin.001 316 Query 1 5 use test; drop table t1 -slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null) -slave-bin.001 439 Create_file 1 7 db=test;table=t1;file_id=1;block_len=81 -slave-bin.001 618 Exec_load 1 8 ;file_id=1 -slave-bin.001 641 Query 1 9 use test; drop table t1 -slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master -slave-bin.001 729 Stop 2 5 +Log_name Pos Event_type Server_id Orig_log_pos Info +slave-bin.001 4 Start 2 4 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 3 +slave-bin.001 79 Slave 2 79 host=127.0.0.1,port=9306,log=master-bin.001,pos=4 +slave-bin.001 132 Query 1 79 use test; create table t1(n int not null auto_increment primary key) +slave-bin.001 225 Intvar 1 200 INSERT_ID=1 +slave-bin.001 253 Query 1 200 use test; insert into t1 values (NULL) +slave-bin.001 316 Query 1 263 use test; drop table t1 +slave-bin.001 364 Query 1 311 use test; create table t1 (word char(20) not null) +slave-bin.001 439 Create_file 1 386 db=test;table=t1;file_id=1;block_len=81 +slave-bin.001 618 Exec_load 1 556 ;file_id=1 +slave-bin.001 641 Query 1 579 use test; drop table t1 +slave-bin.001 689 Rotate 1 627 slave-bin.002;pos=4; forced by master +slave-bin.001 729 Stop 2 729 show binlog events in 'slave-bin.002' from 4; -Log_name Pos Event_type Server_id Log_seq Info -slave-bin.002 4 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 -slave-bin.002 57 Query 1 1 use test; create table t1 (n int) -slave-bin.002 115 Query 1 2 use test; insert into t1 values (1) -slave-bin.002 175 Query 1 3 use test; drop table t1 +Log_name Pos Event_type Server_id Orig_log_pos Info +slave-bin.002 4 Slave 2 627 host=127.0.0.1,port=9306,log=master-bin.002,pos=4 +slave-bin.002 57 Query 1 4 use test; create table t1 (n int) +slave-bin.002 115 Query 1 62 use test; insert into t1 values (1) +slave-bin.002 175 Query 1 122 use test; drop table t1 show slave status; -Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq -127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 170 Yes 0 0 3 +Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos +127.0.0.1 root 9306 1 master-bin.002 170 mysql-relay-bin.002 935 master-bin.002 Yes Yes 0 0 170 show new master for slave with master_log_file='master-bin.001' and -master_log_pos=4 and master_log_seq=1 and master_server_id=1; +master_log_pos=4 and master_server_id=1; Log_name Log_pos slave-bin.001 132 show new master for slave with master_log_file='master-bin.001' and -master_log_pos=79 and master_log_seq=2 and master_server_id=1; +master_log_pos=79 and master_server_id=1; Log_name Log_pos slave-bin.001 225 show new master for slave with master_log_file='master-bin.001' and -master_log_pos=311 and master_log_seq=6 and master_server_id=1; +master_log_pos=311 and master_server_id=1; Log_name Log_pos slave-bin.001 439 show new master for slave with master_log_file='master-bin.002' and -master_log_pos=4 and master_log_seq=1 and master_server_id=1; +master_log_pos=4 and master_server_id=1; Log_name Log_pos slave-bin.002 57 show new master for slave with master_log_file='master-bin.002' and -master_log_pos=137 and master_log_seq=3 and master_server_id=1; +master_log_pos=122 and master_server_id=1; Log_name Log_pos slave-bin.002 223 diff --git a/mysql-test/resolve-stack b/mysql-test/resolve-stack new file mode 100755 index 00000000000..cdbe362c752 --- /dev/null +++ b/mysql-test/resolve-stack @@ -0,0 +1,8 @@ +#! /bin/sh +# A shortcut for resolving stacks when debugging when +# we cannot duplicate the crash in a debugger and have to +# resort to using stack traces + +nm --numeric-sort ../sql/mysqld > var/tmp/mysqld.sym +echo "Please type or paste the numeric stack trace,Ctrl-C to quit:" +../extra/resolve_stack_dump -s var/tmp/mysqld.sym diff --git a/mysql-test/t/rpl000016-slave.opt b/mysql-test/t/rpl000016-slave.opt new file mode 100644 index 00000000000..f27601e0d7d --- /dev/null +++ b/mysql-test/t/rpl000016-slave.opt @@ -0,0 +1 @@ +-O max_binlog_size=2048 diff --git a/mysql-test/t/rpl000016-slave.sh b/mysql-test/t/rpl000016-slave.sh index 62748605af1..9259f593e54 100755 --- a/mysql-test/t/rpl000016-slave.sh +++ b/mysql-test/t/rpl000016-slave.sh @@ -1 +1,2 @@ rm -f $MYSQL_TEST_DIR/var/slave-data/master.info +rm -f $MYSQL_TEST_DIR/var/slave-data/*relay* diff --git a/mysql-test/t/rpl000017-slave.sh b/mysql-test/t/rpl000017-slave.sh index c717500ae4a..555427a2376 100755 --- a/mysql-test/t/rpl000017-slave.sh +++ b/mysql-test/t/rpl000017-slave.sh @@ -1,3 +1,4 @@ +rm -f $MYSQL_TEST_DIR/var/slave-data/*relay* cat > $MYSQL_TEST_DIR/var/slave-data/master.info <<EOF master-bin.001 4 diff --git a/mysql-test/t/rpl_log.test b/mysql-test/t/rpl_log.test index 841524d57e6..0a5c62ed8d5 100644 --- a/mysql-test/t/rpl_log.test +++ b/mysql-test/t/rpl_log.test @@ -37,13 +37,13 @@ show binlog events in 'slave-bin.001' from 4; show binlog events in 'slave-bin.002' from 4; show slave status; show new master for slave with master_log_file='master-bin.001' and - master_log_pos=4 and master_log_seq=1 and master_server_id=1; + master_log_pos=4 and master_server_id=1; show new master for slave with master_log_file='master-bin.001' and - master_log_pos=79 and master_log_seq=2 and master_server_id=1; + master_log_pos=79 and master_server_id=1; show new master for slave with master_log_file='master-bin.001' and - master_log_pos=311 and master_log_seq=6 and master_server_id=1; + master_log_pos=311 and master_server_id=1; show new master for slave with master_log_file='master-bin.002' and - master_log_pos=4 and master_log_seq=1 and master_server_id=1; + master_log_pos=4 and master_server_id=1; show new master for slave with master_log_file='master-bin.002' and - master_log_pos=137 and master_log_seq=3 and master_server_id=1; + master_log_pos=122 and master_server_id=1; diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index b0868851177..c492a295cc1 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -123,6 +123,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->pos_in_file= seek_offset; info->pre_close = info->pre_read = info->post_read = 0; info->arg = 0; + info->init_count++; /* we assume the user had set it to 0 prior to + first call */ info->alloced_buffer = 0; info->buffer=0; info->seek_not_done= test(file >= 0); @@ -447,11 +449,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) info->end_of_file) goto read_append_buffer; - if (info->seek_not_done) - { /* File touched, do seek */ - VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); - info->seek_not_done=0; - } + /* + With read-append cache we must always do a seek before we read, + because the write could have moved the file pointer astray + */ + VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); + info->seek_not_done=0; + diff_length=(uint) (pos_in_file & (IO_SIZE-1)); /* now the second stage begins - read from file descriptor */ @@ -507,6 +511,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) memcpy(Buffer,info->buffer,(size_t) length); Count -= length; Buffer += length; + + /* + added the line below to make + DBUG_ASSERT(pos_in_file==info->end_of_file) pass. + otherwise this does not appear to be needed + */ + pos_in_file += length; goto read_append_buffer; } } @@ -528,10 +539,13 @@ read_append_buffer: /* First copy the data to Count */ uint len_in_buff = (uint) (info->write_pos - info->append_read_pos); uint copy_len; + uint transfer_len; DBUG_ASSERT(info->append_read_pos <= info->write_pos); - DBUG_ASSERT(pos_in_file == info->end_of_file); - + /* + TODO: figure out if the below assert is needed or correct. + */ + DBUG_ASSERT(pos_in_file == info->end_of_file); copy_len=min(Count, len_in_buff); memcpy(Buffer, info->append_read_pos, copy_len); info->append_read_pos += copy_len; @@ -541,11 +555,12 @@ read_append_buffer: /* Fill read buffer with data from write buffer */ memcpy(info->buffer, info->append_read_pos, - (size_t) (len_in_buff - copy_len)); + (size_t) (transfer_len=len_in_buff - copy_len)); info->read_pos= info->buffer; - info->read_end= info->buffer+(len_in_buff - copy_len); + info->read_end= info->buffer+transfer_len; info->append_read_pos=info->write_pos; - info->pos_in_file+=len_in_buff; + info->pos_in_file=pos_in_file+copy_len; + info->end_of_file+=len_in_buff; } unlock_append_buffer(info); return Count ? 1 : 0; @@ -887,12 +902,10 @@ int flush_io_cache(IO_CACHE *info) if ((length=(uint) (info->write_pos - info->write_buffer))) { pos_in_file=info->pos_in_file; - if (append_cache) - { - pos_in_file=info->end_of_file; - info->seek_not_done=1; - } - if (info->seek_not_done) + /* if we have append cache, we always open the file with + O_APPEND which moves the pos to EOF automatically on every write + */ + if (!append_cache && info->seek_not_done) { /* File touched, do seek */ if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR) @@ -902,20 +915,24 @@ int flush_io_cache(IO_CACHE *info) if (!append_cache) info->seek_not_done=0; } - info->write_pos= info->write_buffer; if (!append_cache) info->pos_in_file+=length; info->write_end= (info->write_buffer+info->buffer_length- ((pos_in_file+length) & (IO_SIZE-1))); - /* Set this to be used if we are using SEQ_READ_APPEND */ - info->append_read_pos = info->write_buffer; if (my_write(info->file,info->write_buffer,length, info->myflags | MY_NABP)) info->error= -1; else info->error= 0; - set_if_bigger(info->end_of_file,(pos_in_file+length)); + if (!append_cache) + { + set_if_bigger(info->end_of_file,(pos_in_file+length)); + } + else + info->end_of_file+=(info->write_pos-info->append_read_pos); + + info->append_read_pos=info->write_pos=info->write_buffer; DBUG_RETURN(info->error); } } diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index a2f10ca3b9f..269e9892308 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -32,12 +32,26 @@ void my_b_seek(IO_CACHE *info,my_off_t pos) { - my_off_t offset = (pos - info->pos_in_file); + my_off_t offset; DBUG_ENTER("my_b_seek"); DBUG_PRINT("enter",("pos: %lu", (ulong) pos)); - if (info->type == READ_CACHE) + /* + TODO: verify that it is OK to do seek in the non-append + area in SEQ_READ_APPEND cache + */ + /* TODO: + a) see if this always works + b) see if there is a better way to make it work + */ + if (info->type == SEQ_READ_APPEND) + flush_io_cache(info); + + offset=(pos - info->pos_in_file); + + if (info->type == READ_CACHE || info->type == SEQ_READ_APPEND) { + /* TODO: explain why this works if pos < info->pos_in_file */ if ((ulonglong) offset < (ulonglong) (info->read_end - info->buffer)) { /* The read is in the current buffer; Reuse it */ diff --git a/mysys/thr_mutex.c b/mysys/thr_mutex.c index 340f1461823..f3d17e6a5fd 100644 --- a/mysys/thr_mutex.c +++ b/mysys/thr_mutex.c @@ -70,7 +70,8 @@ int safe_mutex_lock(safe_mutex_t *mp,const char *file, uint line) } if (mp->count++) { - fprintf(stderr,"safe_mutex: Error in thread libray: Got mutex at %s, line %d more than 1 time\n", file,line); + fprintf(stderr,"safe_mutex: Error in thread libray: Got mutex at %s, \ +line %d more than 1 time\n", file,line); fflush(stderr); abort(); } diff --git a/sql/item_func.cc b/sql/item_func.cc index fe68d8f47c2..ff6a102fe1f 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -1453,11 +1453,13 @@ longlong Item_master_pos_wait::val_int() return 0; } ulong pos = (ulong)args[1]->val_int(); - if ((event_count = glob_mi.wait_for_pos(thd, log_name, pos)) == -1) + LOCK_ACTIVE_MI; + if ((event_count = active_mi->rli.wait_for_pos(thd, log_name, pos)) == -1) { null_value = 1; event_count=0; } + UNLOCK_ACTIVE_MI; return event_count; } diff --git a/sql/lex.h b/sql/lex.h index 37fe38b76a1..894fdfb5362 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -218,7 +218,6 @@ static SYMBOL symbols[] = { { "MASTER_HOST", SYM(MASTER_HOST_SYM),0,0}, { "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM),0,0}, { "MASTER_LOG_POS", SYM(MASTER_LOG_POS_SYM),0,0}, - { "MASTER_LOG_SEQ", SYM(MASTER_LOG_SEQ_SYM),0,0}, { "MASTER_PASSWORD", SYM(MASTER_PASSWORD_SYM),0,0}, { "MASTER_PORT", SYM(MASTER_PORT_SYM),0,0}, { "MASTER_SERVER_ID", SYM(MASTER_SERVER_ID_SYM),0,0}, diff --git a/sql/log.cc b/sql/log.cc index fa45d938b24..ec7396bda3c 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -29,6 +29,7 @@ #include <my_dir.h> #include <stdarg.h> #include <m_ctype.h> // For test_if_number +#include <assert.h> MYSQL_LOG mysql_log,mysql_update_log,mysql_slow_log,mysql_bin_log; extern I_List<i_string> binlog_do_db, binlog_ignore_db; @@ -81,7 +82,7 @@ static int find_uniq_filename(char *name) MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), name(0), log_type(LOG_CLOSED),write_error(0), - inited(0), log_seq(1), file_id(1),no_rotate(0), + inited(0), file_id(1),no_rotate(0), need_start_event(1) { /* @@ -138,15 +139,18 @@ bool MYSQL_LOG::open_index( int options) } void MYSQL_LOG::init(enum_log_type log_type_arg, - enum cache_type io_cache_type_arg) + enum cache_type io_cache_type_arg, + bool no_auto_events_arg) { log_type = log_type_arg; io_cache_type = io_cache_type_arg; + no_auto_events = no_auto_events_arg; if (!inited) { inited=1; (void) pthread_mutex_init(&LOCK_log,MY_MUTEX_INIT_SLOW); (void) pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW); + (void) pthread_cond_init(&update_cond, 0); } } @@ -160,16 +164,17 @@ void MYSQL_LOG::close_index() } void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, - const char *new_name) + const char *new_name, enum cache_type io_cache_type_arg, + bool no_auto_events_arg) { MY_STAT tmp_stat; char buff[512]; File file= -1; bool do_magic; - + int open_flags = O_CREAT | O_APPEND | O_BINARY; if (!inited && log_type_arg == LOG_BIN && *fn_ext(log_name)) no_rotate = 1; - init(log_type_arg); + init(log_type_arg,io_cache_type_arg,no_auto_events_arg); if (!(name=my_strdup(log_name,MYF(MY_WME)))) goto err; @@ -177,7 +182,12 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, strmov(log_file_name,new_name); else if (generate_new_name(log_file_name, name)) goto err; - + + if (io_cache_type == SEQ_READ_APPEND) + open_flags |= O_RDWR; + else + open_flags |= O_WRONLY; + if (log_type == LOG_BIN && !index_file_name[0]) fn_format(index_file_name, name, mysql_data_home, ".index", 6); @@ -185,7 +195,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name, &tmp_stat, MYF(0))); - if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY, + if ((file=my_open(log_file_name,open_flags, MYF(MY_WME | ME_WAITTANG))) < 0 || init_io_cache(&log_file, file, IO_SIZE, io_cache_type, my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP))) @@ -235,11 +245,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, open_index(O_APPEND | O_RDWR | O_CREAT)) goto err; - log_seq = 1; - if (need_start_event) + if (need_start_event && !no_auto_events) { Start_log_event s; - s.set_log_seq(0, this); + s.set_log_pos(this); s.write(&log_file); need_start_event=0; } @@ -264,9 +273,7 @@ err: end_io_cache(&log_file); x_free(name); name=0; log_type=LOG_CLOSED; - return; - } int MYSQL_LOG::get_current_log(LOG_INFO* linfo) @@ -279,7 +286,8 @@ int MYSQL_LOG::get_current_log(LOG_INFO* linfo) } // if log_name is "" we stop at the first entry -int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name) +int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name, + bool need_mutex) { if (index_file < 0) return LOG_INFO_INVALID; @@ -290,7 +298,8 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name) // mutex needed because we need to make sure the file pointer does not move // from under our feet - pthread_mutex_lock(&LOCK_index); + if (need_mutex) + pthread_mutex_lock(&LOCK_index); if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) 0, 0, MYF(MY_WME))) { @@ -319,14 +328,15 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name) error = 0; err: - pthread_mutex_unlock(&LOCK_index); + if (need_mutex) + pthread_mutex_unlock(&LOCK_index); end_io_cache(&io_cache); return error; } -int MYSQL_LOG::find_next_log(LOG_INFO* linfo) +int MYSQL_LOG::find_next_log(LOG_INFO* linfo, bool need_lock) { // mutex needed because we need to make sure the file pointer does not move // from under our feet @@ -335,8 +345,8 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo) char* fname = linfo->log_file_name; IO_CACHE io_cache; uint length; - - pthread_mutex_lock(&LOCK_index); + if (need_lock) + pthread_mutex_lock(&LOCK_index); if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) linfo->index_file_offset, 0, MYF(MY_WME))) @@ -354,12 +364,126 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo) error = 0; err: - pthread_mutex_unlock(&LOCK_index); + if (need_lock) + pthread_mutex_unlock(&LOCK_index); end_io_cache(&io_cache); return error; } - +int MYSQL_LOG::reset_logs(THD* thd) +{ + LOG_INFO linfo; + int error=0; + const char* save_name; + enum_log_type save_log_type; + pthread_mutex_lock(&LOCK_log); + if (find_first_log(&linfo,"")) + { + error=1; + goto err; + } + + for(;;) + { + my_delete(linfo.log_file_name, MYF(MY_WME)); + if (find_next_log(&linfo)) + break; + } + save_name=name; + name=0; + save_log_type=log_type; + close(1); + my_delete(index_file_name, MYF(MY_WME)); + if (thd && !thd->slave_thread) + need_start_event=1; + open(save_name,save_log_type,0,io_cache_type,no_auto_events); + my_free((gptr)save_name,MYF(0)); +err: + pthread_mutex_unlock(&LOCK_log); + return error; +} + +int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli) +{ + // pre-conditions + DBUG_ASSERT(is_open()); + DBUG_ASSERT(index_file >= 0); + DBUG_ASSERT(rli->slave_running == 1); + DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->relay_log_name)); + // assume that we have previously read the first log and + // stored it in rli->relay_log_name + DBUG_ASSERT(rli->linfo.index_file_offset == + strlen(rli->relay_log_name) + 1); + + int tmp_fd; + + + char* fname, *io_buf; + int error = 0; + if (!(fname = (char*)my_malloc(IO_SIZE+FN_REFLEN, MYF(MY_WME)))) + return 1; + pthread_mutex_lock(&LOCK_index); + my_seek(index_file,rli->linfo.index_file_offset, + MY_SEEK_SET, MYF(MY_WME)); + io_buf = fname + FN_REFLEN; + strxmov(fname,rli->relay_log_name,".tmp",NullS); + + if ((tmp_fd = my_open(fname,O_CREAT|O_BINARY|O_RDWR, MYF(MY_WME))) < 0) + { + error = 1; + goto err; + } + for (;;) + { + int bytes_read; + bytes_read = my_read(index_file, io_buf, IO_SIZE, MYF(0)); + if (bytes_read < 0) // error + { + error=1; + goto err; + } + if (!bytes_read) + break; // end of file + // otherwise, we've read something and need to write it out + if (my_write(tmp_fd, io_buf, bytes_read, MYF(MY_WME|MY_NABP))) + { + error=1; + goto err; + } + } +err: + if (tmp_fd) + my_close(tmp_fd, MYF(MY_WME)); + if (error) + my_delete(fname, MYF(0)); // do not report error if the file is not there + else + { + my_close(index_file, MYF(MY_WME)); + if (my_rename(fname,index_file_name,MYF(MY_WME)) || + (index_file=my_open(index_file_name,O_BINARY|O_RDWR|O_APPEND, + MYF(MY_WME)))<0 || + my_delete(rli->relay_log_name, MYF(MY_WME))) + error=1; + if ((error=find_first_log(&rli->linfo,"",0/*no mutex*/))) + { + char buff[22]; + sql_print_error("next log error=%d,offset=%s,log=%s",error, + llstr(rli->linfo.index_file_offset,buff), + rli->linfo.log_file_name); + goto err2; + } + rli->relay_log_pos = 4; + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + } + // no need to free io_buf because we allocated both fname and io_buf in + // one malloc() +err2: + pthread_mutex_unlock(&LOCK_index); + my_free(fname, MYF(MY_WME)); + return error; +} + int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) { if (index_file < 0) return LOG_INFO_INVALID; @@ -395,9 +519,8 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) goto err; } logs_to_keep_inited = 1; - - for(;;) + for (;;) { my_off_t init_purge_offset= my_b_tell(&io_cache); if (!(fname_len=my_b_gets(&io_cache, fname, FN_REFLEN))) @@ -409,14 +532,14 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) } fname[--fname_len]=0; // kill \n - if(!memcmp(fname, to_log, fname_len + 1 )) + if (!memcmp(fname, to_log, fname_len + 1 )) { found_log = 1; purge_offset = init_purge_offset; } // if one of the logs before the target is in use - if(!found_log && log_in_use(fname)) + if (!found_log && log_in_use(fname)) { error = LOG_INFO_IN_USE; goto err; @@ -432,13 +555,13 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) } end_io_cache(&io_cache); - if(!found_log) + if (!found_log) { error = LOG_INFO_EOF; goto err; } - for(i = 0; i < logs_to_purge.elements; i++) + for (i = 0; i < logs_to_purge.elements; i++) { char* l; get_dynamic(&logs_to_purge, (gptr)&l, i); @@ -461,9 +584,9 @@ during log purge for write"); #else my_close(index_file, MYF(MY_WME)); my_delete(index_file_name, MYF(MY_WME)); - if(!(index_file = my_open(index_file_name, + if ((index_file = my_open(index_file_name, O_CREAT | O_BINARY | O_RDWR | O_APPEND, - MYF(MY_WME)))) + MYF(MY_WME)))<0) { sql_print_error("Could not re-open the binlog index file \ during log purge for write"); @@ -472,7 +595,7 @@ during log purge for write"); } #endif - for(i = 0; i < logs_to_keep.elements; i++) + for (i = 0; i < logs_to_keep.elements; i++) { char* l; get_dynamic(&logs_to_keep, (gptr)&l, i); @@ -490,15 +613,14 @@ during log purge for write"); err: pthread_mutex_unlock(&LOCK_index); - if(logs_to_purge_inited) + if (logs_to_purge_inited) delete_dynamic(&logs_to_purge); - if(logs_to_keep_inited) + if (logs_to_keep_inited) delete_dynamic(&logs_to_keep); end_io_cache(&io_cache); return error; } - // we assume that buf has at least FN_REFLEN bytes alloced void MYSQL_LOG::make_log_name(char* buf, const char* log_ident) { @@ -543,30 +665,36 @@ void MYSQL_LOG::new_file(bool inside_mutex) } if (log_type == LOG_BIN) { - /* - We log the whole file name for log file as the user may decide - to change base names at some point. - */ - THD* thd = current_thd; - Rotate_log_event r(thd,new_name+dirname_length(new_name)); - r.set_log_seq(0, this); - - /* - This log rotation could have been initiated by a master of - the slave running with log-bin we set the flag on rotate - event to prevent inifinite log rotation loop - */ - if (thd && slave_thd && thd == slave_thd) - r.flags |= LOG_EVENT_FORCED_ROTATE_F; - r.write(&log_file); - VOID(pthread_cond_broadcast(&COND_binlog_update)); + if (!no_auto_events) + { + /* + We log the whole file name for log file as the user may decide + to change base names at some point. + */ + THD* thd = current_thd; + Rotate_log_event r(thd,new_name+dirname_length(new_name)); + r.set_log_pos(this); + + /* + This log rotation could have been initiated by a master of + the slave running with log-bin we set the flag on rotate + event to prevent inifinite log rotation loop + */ + if (thd && thd->slave_thread) + r.flags |= LOG_EVENT_FORCED_ROTATE_F; + r.write(&log_file); + } + // update needs to be signaled even if there is no rotate event + // log rotation should give the waiting thread a signal to + // discover EOF and move on to the next log + signal_update(); } else strmov(new_name, old_name); // Reopen old file name } name=0; close(); - open(old_name, log_type, new_name); + open(old_name, log_type, new_name, io_cache_type, no_auto_events); my_free(old_name,MYF(0)); last_time=query_start=0; write_error=0; @@ -575,6 +703,31 @@ void MYSQL_LOG::new_file(bool inside_mutex) } } +bool MYSQL_LOG::appendv(const char* buf, uint len,...) +{ + bool error = 0; + va_list(args); + va_start(args,len); + + pthread_mutex_lock(&LOCK_log); + do + { + if (my_b_append(&log_file,buf,len)) + { + error = 1; + break; + } + if ((uint)my_b_append_tell(&log_file) > max_binlog_size) + { + new_file(1); + } + } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint))); + + if (!error) + signal_update(); + pthread_mutex_unlock(&LOCK_log); + return error; +} bool MYSQL_LOG::write(THD *thd,enum enum_server_command command, const char *format,...) @@ -684,11 +837,12 @@ bool MYSQL_LOG::write(Log_event* event_info) return 0; } error=1; - + // no check for auto events flag here - this write method should + // never be called if auto-events are enabled if (thd && thd->last_insert_id_used) { Intvar_log_event e(thd,(uchar)LAST_INSERT_ID_EVENT,thd->last_insert_id); - e.set_log_seq(thd, this); + e.set_log_pos(this); if (thd->server_id) e.server_id = thd->server_id; if (e.write(file)) @@ -697,7 +851,7 @@ bool MYSQL_LOG::write(Log_event* event_info) if (thd && thd->insert_id_used) { Intvar_log_event e(thd,(uchar)INSERT_ID_EVENT,thd->last_insert_id); - e.set_log_seq(thd, this); + e.set_log_pos(this); if (thd->server_id) e.server_id = thd->server_id; if (e.write(file)) @@ -712,12 +866,12 @@ bool MYSQL_LOG::write(Log_event* event_info) // just in case somebody wants it later thd->query_length = (uint)(p - buf); Query_log_event e(thd, buf); - e.set_log_seq(thd, this); + e.set_log_pos(this); if (e.write(file)) goto err; thd->query_length = save_query_length; // clean up } - event_info->set_log_seq(thd, this); + event_info->set_log_pos(this); if (event_info->write(file) || file == &log_file && flush_io_cache(file)) goto err; @@ -734,7 +888,7 @@ err: write_error=1; } if (file == &log_file) - VOID(pthread_cond_broadcast(&COND_binlog_update)); + signal_update(); } if (should_rotate) new_file(1); // inside mutex @@ -765,7 +919,7 @@ bool MYSQL_LOG::write(IO_CACHE *cache) if (is_open()) { uint length; - + //QQ: this looks like a bug - why READ_CACHE? if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) { sql_print_error(ER(ER_ERROR_ON_WRITE), cache->file_name, errno); @@ -800,7 +954,7 @@ err: if (error) write_error=1; else - VOID(pthread_cond_broadcast(&COND_binlog_update)); + signal_update(); VOID(pthread_mutex_unlock(&LOCK_log)); @@ -930,21 +1084,37 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, return error; } +void MYSQL_LOG:: wait_for_update(THD* thd) +{ + const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log, + "Slave: waiting for binlog update"); + pthread_cond_wait(&update_cond, &LOCK_log); + // this is not a bug - we unlock the mutex for the caller, and expect him + // to lock it and then not unlock it upon return. This is a rather odd + // way of doing things, but this is the cleanest way I could think of to + // solve the race deadlock caused by THD::awake() first acquiring mysys_var + // mutex and then the current mutex, while wait_for_update being called with + // the current mutex already aquired and THD::exit_cond() trying to acquire + // mysys_var mutex. We do need the mutex to be acquired prior to the + // invocation of wait_for_update in all cases, so mutex acquisition inside + // wait_for_update() is not an option + pthread_mutex_unlock(&LOCK_log); + thd->exit_cond(old_msg); +} void MYSQL_LOG::close(bool exiting) { // One can't set log_type here! if (is_open()) { - File file=log_file.file; - if (log_type == LOG_BIN) + if (log_type == LOG_BIN && !no_auto_events) { Stop_log_event s; - s.set_log_seq(0, this); + s.set_log_pos(this); s.write(&log_file); - VOID(pthread_cond_broadcast(&COND_binlog_update)); + signal_update(); } end_io_cache(&log_file); - if (my_close(file,MYF(0)) < 0 && ! write_error) + if (my_close(log_file.file,MYF(0)) < 0 && ! write_error) { write_error=1; sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno); diff --git a/sql/log_event.cc b/sql/log_event.cc index 72198038a07..7eb7c57ae40 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -24,6 +24,8 @@ #include <my_dir.h> #endif /* MYSQL_CLIENT */ +#include <assert.h> + #ifdef MYSQL_CLIENT static void pretty_print_str(FILE* file, char* str, int len) { @@ -118,14 +120,14 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg): if (thd) { server_id = thd->server_id; - log_seq = thd->log_seq; when = thd->start_time; + log_pos = thd->log_pos; } else { server_id = ::server_id; - log_seq = 0; when = time(NULL); + log_pos=0; } } @@ -156,12 +158,12 @@ Log_event::Log_event(const char* buf, bool old_format): server_id = uint4korr(buf + SERVER_ID_OFFSET); if (old_format) { - log_seq=0; + log_pos=0; flags=0; } else { - log_seq = uint4korr(buf + LOG_SEQ_OFFSET); + log_pos = uint4korr(buf + LOG_POS_OFFSET); flags = uint2korr(buf + FLAGS_OFFSET); } #ifndef MYSQL_CLIENT @@ -172,13 +174,13 @@ Log_event::Log_event(const char* buf, bool old_format): #ifndef MYSQL_CLIENT -int Log_event::exec_event(struct st_master_info* mi) +int Log_event::exec_event(struct st_relay_log_info* rli) { - if (mi) + if (rli) { - thd->log_seq = 0; - mi->inc_pos(get_event_len(), log_seq); - flush_master_info(mi); + rli->inc_pos(get_event_len(),log_pos); + DBUG_ASSERT(rli->sql_thd != 0); + flush_relay_log_info(rli); } return 0; } @@ -193,14 +195,14 @@ void Query_log_event::pack_info(String* packet) char buf[256]; String tmp(buf, sizeof(buf)); tmp.length(0); - if(db && db_len) + if (db && db_len) { tmp.append("use "); tmp.append(db, db_len); tmp.append("; ", 2); } - if(query && q_len) + if (query && q_len) tmp.append(query, q_len); net_store_data(packet, (char*)tmp.ptr(), tmp.length()); } @@ -345,7 +347,7 @@ void Log_event::init_show_field_list(List<Item>* field_list) field_list->push_back(new Item_empty_string("Pos", 20)); field_list->push_back(new Item_empty_string("Event_type", 20)); field_list->push_back(new Item_empty_string("Server_id", 20)); - field_list->push_back(new Item_empty_string("Log_seq", 20)); + field_list->push_back(new Item_empty_string("Orig_log_pos", 20)); field_list->push_back(new Item_empty_string("Info", 20)); } @@ -363,7 +365,7 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos) event_type = get_type_str(); net_store_data(packet, event_type, strlen(event_type)); net_store_data(packet, server_id); - net_store_data(packet, log_seq); + net_store_data(packet, log_pos); pack_info(packet); return my_net_write(&thd->net, (char*)packet->ptr(), packet->length()); } @@ -392,7 +394,7 @@ int Log_event::write_header(IO_CACHE* file) long tmp=get_data_size() + LOG_EVENT_HEADER_LEN; int4store(pos, tmp); pos += 4; - int4store(pos, log_seq); + int4store(pos, log_pos); pos += 4; int2store(pos, flags); pos += 2; @@ -456,7 +458,6 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, #define LOCK_MUTEX #endif - // allocates memory - the caller is responsible for clean-up #ifndef MYSQL_CLIENT Log_event* Log_event::read_log_event(IO_CACHE* file, @@ -501,7 +502,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) } buf[data_len] = 0; memcpy(buf, head, header_size); - if(my_b_read(file, (byte*) buf + header_size, + if (my_b_read(file, (byte*) buf + header_size, data_len - header_size)) { error = "read error"; @@ -511,9 +512,10 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) res->register_temp_buf(buf); err: UNLOCK_MUTEX; - if(error) + if (error) { - sql_print_error(error); + sql_print_error("Error in Log_event::read_log_event(): '%s', \ +data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]); my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); } return res; @@ -581,9 +583,11 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, #ifdef MYSQL_CLIENT void Log_event::print_header(FILE* file) { + char llbuff[22]; fputc('#', file); print_timestamp(file); - fprintf(file, " server id %d ", server_id); + fprintf(file, " server id %d log_pos %s ", server_id, + llstr(log_pos,llbuff)); } void Log_event::print_timestamp(FILE* file, time_t* ts) @@ -1187,12 +1191,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) #ifndef MYSQL_CLIENT -void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log) +void Log_event::set_log_pos(MYSQL_LOG* log) { - log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++; + if (!log_pos) + log_pos = my_b_tell(&log->log_file); } - void Load_log_event::set_fields(List<Item> &fields) { uint i; @@ -1205,14 +1209,20 @@ void Load_log_event::set_fields(List<Item> &fields) } -Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi): +Slave_log_event::Slave_log_event(THD* thd_arg, + struct st_relay_log_info* rli): Log_event(thd_arg),mem_pool(0),master_host(0) { - if(!mi->inited) + if(!rli->inited) return; - pthread_mutex_lock(&mi->lock); + + MASTER_INFO* mi = rli->mi; + // TODO: re-write this better without holding both + // locks at the same time + pthread_mutex_lock(&mi->data_lock); + pthread_mutex_lock(&rli->data_lock); master_host_len = strlen(mi->host); - master_log_len = strlen(mi->log_file_name); + master_log_len = strlen(rli->master_log_name); // on OOM, just do not initialize the structure and print the error if((mem_pool = (char*)my_malloc(get_data_size() + 1, MYF(MY_WME)))) @@ -1220,13 +1230,14 @@ Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi): master_host = mem_pool + SL_MASTER_HOST_OFFSET ; memcpy(master_host, mi->host, master_host_len + 1); master_log = master_host + master_host_len + 1; - memcpy(master_log, mi->log_file_name, master_log_len + 1); + memcpy(master_log, rli->master_log_name, master_log_len + 1); master_port = mi->port; - master_pos = mi->pos; + master_pos = rli->master_log_pos; } else sql_print_error("Out of memory while recording slave event"); - pthread_mutex_unlock(&mi->lock); + pthread_mutex_unlock(&rli->data_lock); + pthread_mutex_unlock(&mi->data_lock); } @@ -1533,7 +1544,7 @@ void Execute_load_log_event::pack_info(String* packet) #endif #ifndef MYSQL_CLIENT -int Query_log_event::exec_event(struct st_master_info* mi) +int Query_log_event::exec_event(struct st_relay_log_info* rli) { int expected_error,actual_error = 0; init_sql_alloc(&thd->mem_root, 8192,0); @@ -1553,7 +1564,7 @@ int Query_log_event::exec_event(struct st_master_info* mi) // sanity check to make sure the master did not get a really bad // error on the query - if (!check_expected_error(thd, (expected_error = error_code))) + if (!check_expected_error(thd,rli,(expected_error = error_code))) { mysql_parse(thd, thd->query, q_len); if (expected_error != @@ -1570,8 +1581,8 @@ int Query_log_event::exec_event(struct st_master_info* mi) else if (expected_error == actual_error) { thd->query_error = 0; - *last_slave_error = 0; - last_slave_errno = 0; + *rli->last_slave_error = 0; + rli->last_slave_errno = 0; } } else @@ -1592,17 +1603,17 @@ int Query_log_event::exec_event(struct st_master_info* mi) if (thd->query_error || thd->fatal_error) { - slave_print_error(actual_error, "error '%s' on query '%s'", + slave_print_error(rli,actual_error, "error '%s' on query '%s'", actual_error ? thd->net.last_error : "unexpected success or fatal error", query); free_root(&thd->mem_root,0); return 1; } free_root(&thd->mem_root,0); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Load_log_event::exec_event(NET* net, struct st_master_info* mi) +int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) { init_sql_alloc(&thd->mem_root, 8192,0); thd->db = rewrite_db((char*)db); @@ -1625,6 +1636,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) // the table will be opened in mysql_load if(table_rules_on && !tables_ok(thd, &tables)) { + // TODO: this is a bug - this needs to be moved to the I/O thread if (net) skip_load_data_infile(net); } @@ -1632,7 +1644,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) { char llbuff[22]; enum enum_duplicates handle_dup = DUP_IGNORE; - if(sql_ex.opt_flags && REPLACE_FLAG) + if (sql_ex.opt_flags && REPLACE_FLAG) handle_dup = DUP_REPLACE; sql_exchange ex((char*)fname, sql_ex.opt_flags && DUMPFILE_FLAG ); @@ -1663,7 +1675,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) thd->query_error = 1; if(thd->cuted_fields) sql_print_error("Slave: load data infile at position %s in log \ -'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME, +'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME, thd->cuted_fields ); if(net) net->pkt_nr = thd->net.pkt_nr; @@ -1673,6 +1685,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) { // we will just ask the master to send us /dev/null if we do not // want to load the data + // TODO: this a bug - needs to be done in I/O thread if (net) skip_load_data_infile(net); } @@ -1683,10 +1696,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) if(thd->query_error) { int sql_error = thd->net.last_errno; - if(!sql_error) + if (!sql_error) sql_error = ER_UNKNOWN_ERROR; - slave_print_error(sql_error, "Slave: Error '%s' running load data infile ", + slave_print_error(rli,sql_error, + "Slave: Error '%s' running load data infile ", ER_SAFE(sql_error)); free_root(&thd->mem_root,0); return 1; @@ -1699,38 +1713,43 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) return 1; } - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Start_log_event::exec_event(struct st_master_info* mi) +int Start_log_event::exec_event(struct st_relay_log_info* rli) { - if (!mi->old_format) + if (!rli->mi->old_format) { close_temporary_tables(thd); cleanup_load_tmpdir(); } - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Stop_log_event::exec_event(struct st_master_info* mi) +int Stop_log_event::exec_event(struct st_relay_log_info* rli) { - if (mi->pos > 4) // stop event should be ignored after rotate event + // do not clean up immediately after rotate event + if (rli->master_log_pos > 4) { close_temporary_tables(thd); cleanup_load_tmpdir(); - mi->inc_pos(get_event_len(), log_seq); - flush_master_info(mi); } - thd->log_seq = 0; + // we do not want to update master_log pos because we get a rotate event + // before stop, so by now master_log_name is set to the next log + // if we updated it, we will have incorrect master coordinates and this + // could give false triggers in MASTER_POS_WAIT() that we have reached + // the targed position when in fact we have not + rli->inc_pos(get_event_len(), 0); + flush_relay_log_info(rli); return 0; } -int Rotate_log_event::exec_event(struct st_master_info* mi) +int Rotate_log_event::exec_event(struct st_relay_log_info* rli) { bool rotate_binlog = 0, write_slave_event = 0; - char* log_name = mi->log_file_name; - pthread_mutex_lock(&mi->lock); - + char* log_name = rli->master_log_name; + pthread_mutex_lock(&rli->data_lock); + // TODO: probably needs re-write // rotate local binlog only if the name of remote has changed if (!*log_name || !(log_name[ident_len] == 0 && !memcmp(log_name, new_log_ident, ident_len))) @@ -1738,41 +1757,38 @@ int Rotate_log_event::exec_event(struct st_master_info* mi) write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F) && mysql_bin_log.is_open()); rotate_binlog = (*log_name && write_slave_event); - memcpy(log_name, new_log_ident,ident_len ); + if (ident_len >= sizeof(rli->master_log_name)) + return 1; + memcpy(log_name, new_log_ident,ident_len); log_name[ident_len] = 0; } - mi->pos = pos; - mi->last_log_seq = log_seq; -#ifndef DBUG_OFF - if (abort_slave_event_count) - ++events_till_abort; -#endif + rli->master_log_pos = pos; + rli->relay_log_pos += get_event_len(); if (rotate_binlog) { mysql_bin_log.new_file(); - mi->last_log_seq = 0; + rli->master_log_pos = 4; } - pthread_cond_broadcast(&mi->cond); - pthread_mutex_unlock(&mi->lock); - flush_master_info(mi); + pthread_cond_broadcast(&rli->data_cond); + pthread_mutex_unlock(&rli->data_lock); + flush_relay_log_info(rli); if (write_slave_event) { - Slave_log_event s(thd, mi); + Slave_log_event s(thd, rli); if (s.master_host) { - s.set_log_seq(0, &mysql_bin_log); + s.set_log_pos(&mysql_bin_log); s.server_id = ::server_id; mysql_bin_log.write(&s); } } - thd->log_seq = 0; return 0; } -int Intvar_log_event::exec_event(struct st_master_info* mi) +int Intvar_log_event::exec_event(struct st_relay_log_info* rli) { - switch(type) + switch (type) { case LAST_INSERT_ID_EVENT: thd->last_insert_id_used = 1; @@ -1782,18 +1798,18 @@ int Intvar_log_event::exec_event(struct st_master_info* mi) thd->next_insert_id = val; break; } - mi->inc_pending(get_event_len()); + rli->inc_pending(get_event_len()); return 0; } -int Slave_log_event::exec_event(struct st_master_info* mi) +int Slave_log_event::exec_event(struct st_relay_log_info* rli) { if(mysql_bin_log.is_open()) mysql_bin_log.write(this); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Create_file_log_event::exec_event(struct st_master_info* mi) +int Create_file_log_event::exec_event(struct st_relay_log_info* rli) { char fname_buf[FN_REFLEN+10]; char *p; @@ -1809,7 +1825,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi) init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { - slave_print_error(my_errno, "Could not open file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); goto err; } @@ -1820,7 +1836,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi) if (write_base(&file)) { strmov(p, ".info"); // to have it right in the error message - slave_print_error(my_errno, "Could not write to file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf); goto err; } end_io_cache(&file); @@ -1830,12 +1846,12 @@ int Create_file_log_event::exec_event(struct st_master_info* mi) if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, MYF(MY_WME))) < 0) { - slave_print_error(my_errno, "Could not open file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(my_errno, "Write to '%s' failed", fname_buf); + slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf); goto err; } if (mysql_bin_log.is_open()) @@ -1846,10 +1862,10 @@ err: end_io_cache(&file); if (fd >= 0) my_close(fd, MYF(0)); - return error ? 1 : Log_event::exec_event(mi); + return error ? 1 : Log_event::exec_event(rli); } -int Delete_file_log_event::exec_event(struct st_master_info* mi) +int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1860,10 +1876,10 @@ int Delete_file_log_event::exec_event(struct st_master_info* mi) (void)my_delete(fname, MYF(MY_WME)); if (mysql_bin_log.is_open()) mysql_bin_log.write(this); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Append_block_log_event::exec_event(struct st_master_info* mi) +int Append_block_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1873,12 +1889,12 @@ int Append_block_log_event::exec_event(struct st_master_info* mi) memcpy(p, ".data", 6); if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0) { - slave_print_error(my_errno, "Could not open file '%s'", fname); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(my_errno, "Write to '%s' failed", fname); + slave_print_error(rli,my_errno, "Write to '%s' failed", fname); goto err; } if (mysql_bin_log.is_open()) @@ -1887,10 +1903,10 @@ int Append_block_log_event::exec_event(struct st_master_info* mi) err: if (fd >= 0) my_close(fd, MYF(0)); - return error ? error : Log_event::exec_event(mi); + return error ? error : Log_event::exec_event(rli); } -int Execute_load_log_event::exec_event(struct st_master_info* mi) +int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1906,7 +1922,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { - slave_print_error(my_errno, "Could not open file '%s'", fname); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname); goto err; } if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, @@ -1914,7 +1930,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) (bool)0)) || lev->get_type_code() != NEW_LOAD_EVENT) { - slave_print_error(0, "File '%s' appears corrupted", fname); + slave_print_error(rli,0, "File '%s' appears corrupted", fname); goto err; } // we want to disable binary logging in slave thread @@ -1927,7 +1943,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) lev->thd = thd; if (lev->exec_event(0,0)) { - slave_print_error(my_errno, "Failed executing load from '%s'", fname); + slave_print_error(rli,my_errno, "Failed executing load from '%s'", fname); thd->options = save_options; goto err; } @@ -1943,7 +1959,7 @@ err: end_io_cache(&file); if (fd >= 0) my_close(fd, MYF(0)); - return error ? error : Log_event::exec_event(mi); + return error ? error : Log_event::exec_event(rli); } diff --git a/sql/log_event.h b/sql/log_event.h index 329d748025d..686d353d8bb 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -34,7 +34,7 @@ #define LOG_READ_TOO_LARGE -7 #define LOG_EVENT_OFFSET 4 -#define BINLOG_VERSION 2 +#define BINLOG_VERSION 3 /* we could have used SERVER_VERSION_LENGTH, but this introduces an obscure dependency - if somebody decided to change SERVER_VERSION_LENGTH @@ -120,7 +120,7 @@ struct sql_ex_info #define EVENT_TYPE_OFFSET 4 #define SERVER_ID_OFFSET 5 #define EVENT_LEN_OFFSET 9 -#define LOG_SEQ_OFFSET 13 +#define LOG_POS_OFFSET 13 #define FLAGS_OFFSET 17 /* start event post-header */ @@ -206,7 +206,7 @@ class THD; extern uint32 server_id; -struct st_master_info; +struct st_relay_log_info; class Log_event { @@ -214,7 +214,7 @@ public: time_t when; ulong exec_time; uint32 server_id; - uint32 log_seq; + uint32 log_pos; uint16 flags; int cached_event_len; char* temp_buf; @@ -282,11 +282,11 @@ public: #ifndef MYSQL_CLIENT static int read_log_event(IO_CACHE* file, String* packet, pthread_mutex_t* log_lock); - void set_log_seq(THD* thd, MYSQL_LOG* log); + void set_log_pos(MYSQL_LOG* log); virtual void pack_info(String* packet); int net_send(THD* thd, const char* log_name, my_off_t pos); static void init_show_field_list(List<Item>* field_list); - virtual int exec_event(struct st_master_info* mi); + virtual int exec_event(struct st_relay_log_info* rli); virtual const char* get_db() { return thd ? thd->db : 0; @@ -316,7 +316,7 @@ public: bool using_trans=0); const char* get_db() { return db; } void pack_info(String* packet); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); bool get_cache_stmt() { return cache_stmt; } #endif @@ -359,9 +359,9 @@ public: ulonglong master_pos; #ifndef MYSQL_CLIENT - Slave_log_event(THD* thd_arg, struct st_master_info* mi); + Slave_log_event(THD* thd_arg, struct st_relay_log_info* rli); void pack_info(String* packet); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif Slave_log_event(const char* buf, int event_len); @@ -407,11 +407,11 @@ public: void set_fields(List<Item> &fields_arg); void pack_info(String* packet); const char* get_db() { return db; } - int exec_event(struct st_master_info* mi) + int exec_event(struct st_relay_log_info* rli) { - return exec_event(thd->slave_net,mi); + return exec_event(thd->slave_net,rli); } - int exec_event(NET* net, struct st_master_info* mi); + int exec_event(NET* net, struct st_relay_log_info* rli); #endif Load_log_event(const char* buf, int event_len, bool old_format); @@ -465,7 +465,7 @@ public: } #ifndef MYSQL_CLIENT void pack_info(String* packet); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); @@ -491,7 +491,7 @@ public: bool is_valid() { return 1; } #ifndef MYSQL_CLIENT void pack_info(String* packet); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif #ifdef MYSQL_CLIENT @@ -517,7 +517,7 @@ public: void print(FILE* file, bool short_form = 0, char* last_db = 0); #endif #ifndef MYSQL_CLIENT - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif }; @@ -553,7 +553,7 @@ public: #endif #ifndef MYSQL_CLIENT void pack_info(String* packet); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif }; @@ -602,7 +602,7 @@ public: #endif #ifndef MYSQL_CLIENT void pack_info(String* packet); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif }; @@ -616,7 +616,7 @@ public: #ifndef MYSQL_CLIENT Append_block_log_event(THD* thd, char* block_arg, uint block_len_arg); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif Append_block_log_event(const char* buf, int event_len); @@ -659,7 +659,7 @@ public: #endif #ifndef MYSQL_CLIENT void pack_info(String* packet); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif }; @@ -686,7 +686,7 @@ public: #endif #ifndef MYSQL_CLIENT void pack_info(String* packet); - int exec_event(struct st_master_info* mi); + int exec_event(struct st_relay_log_info* rli); #endif }; diff --git a/sql/mini_client.cc b/sql/mini_client.cc index f75f5b09cef..a43e5710e60 100644 --- a/sql/mini_client.cc +++ b/sql/mini_client.cc @@ -101,7 +101,7 @@ static MYSQL_FIELD *unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields, my_bool default_value, my_bool long_flag_protocol); -static void mc_end_server(MYSQL *mysql); +void mc_end_server(MYSQL *mysql); static int mc_sock_connect(File s, const struct sockaddr *name, uint namelen, uint to); static void mc_free_old_query(MYSQL *mysql); static int mc_send_file_to_server(MYSQL *mysql, const char *filename); @@ -206,8 +206,7 @@ HANDLE create_named_pipe(NET *net, uint connect_timeout, char **arg_host, ** Init MySQL structure or allocate one ****************************************************************************/ -MYSQL * STDCALL -mc_mysql_init(MYSQL *mysql) +MYSQL *mc_mysql_init(MYSQL *mysql) { init_client_errs(); if (!mysql) @@ -229,7 +228,7 @@ mc_mysql_init(MYSQL *mysql) ** Shut down connection **************************************************************************/ -static void +void mc_end_server(MYSQL *mysql) { DBUG_ENTER("mc_end_server"); @@ -351,7 +350,7 @@ static int mc_sock_connect(my_socket s, const struct sockaddr *name, ** or packet is an error message *****************************************************************************/ -ulong STDCALL +ulong mc_net_safe_read(MYSQL *mysql) { NET *net= &mysql->net; @@ -412,17 +411,17 @@ max_allowed_packet on this server"); } -char * STDCALL mc_mysql_error(MYSQL *mysql) +char * mc_mysql_error(MYSQL *mysql) { return (mysql)->net.last_error; } -int STDCALL mc_mysql_errno(MYSQL *mysql) +int mc_mysql_errno(MYSQL *mysql) { return (mysql)->net.last_errno; } -my_bool STDCALL mc_mysql_reconnect(MYSQL *mysql) +my_bool mc_mysql_reconnect(MYSQL *mysql) { MYSQL tmp_mysql; DBUG_ENTER("mc_mysql_reconnect"); @@ -452,7 +451,7 @@ my_bool STDCALL mc_mysql_reconnect(MYSQL *mysql) -int STDCALL +int mc_simple_command(MYSQL *mysql,enum enum_server_command command, const char *arg, uint length, my_bool skipp_check) { @@ -505,7 +504,7 @@ mc_simple_command(MYSQL *mysql,enum enum_server_command command, } -MYSQL * STDCALL +MYSQL * mc_mysql_connect(MYSQL *mysql,const char *host, const char *user, const char *passwd, const char *db, uint port, const char *unix_socket,uint client_flag) @@ -856,7 +855,7 @@ error: ** NB! Errors are not reported until you do mysql_real_connect. ************************************************************************** */ -int STDCALL +int mysql_ssl_clear(MYSQL *mysql) { my_free(mysql->options.ssl_key, MYF(MY_ALLOW_ZERO_PTR)); @@ -879,7 +878,7 @@ mysql_ssl_clear(MYSQL *mysql) ** If handle is alloced by mysql connect free it. *************************************************************************/ -void STDCALL +void mc_mysql_close(MYSQL *mysql) { DBUG_ENTER("mysql_close"); @@ -910,7 +909,7 @@ mc_mysql_close(MYSQL *mysql) DBUG_VOID_RETURN; } -void STDCALL mc_mysql_free_result(MYSQL_RES *result) +void mc_mysql_free_result(MYSQL_RES *result) { DBUG_ENTER("mc_mysql_free_result"); DBUG_PRINT("enter",("mysql_res: %lx",result)); @@ -988,13 +987,13 @@ mc_unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields, DBUG_RETURN(result); } -int STDCALL +int mc_mysql_send_query(MYSQL* mysql, const char* query, uint length) { return mc_simple_command(mysql, COM_QUERY, query, length, 1); } -int STDCALL mc_mysql_read_query_result(MYSQL *mysql) +int mc_mysql_read_query_result(MYSQL *mysql) { uchar *pos; ulong field_count; @@ -1042,7 +1041,7 @@ get_info: DBUG_RETURN(0); } -int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length) +int mc_mysql_query(MYSQL *mysql, const char *query, uint length) { DBUG_ENTER("mysql_real_query"); DBUG_PRINT("enter",("handle: %lx",mysql)); @@ -1289,17 +1288,17 @@ static int mc_read_one_row(MYSQL *mysql,uint fields,MYSQL_ROW row, return 0; } -my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res) +my_ulonglong mc_mysql_num_rows(MYSQL_RES *res) { return res->row_count; } -unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res) +unsigned int mc_mysql_num_fields(MYSQL_RES *res) { return res->field_count; } -void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row) +void mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row) { MYSQL_ROWS *tmp=0; DBUG_PRINT("info",("mysql_data_seek(%ld)",(long) row)); @@ -1309,7 +1308,7 @@ void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row) result->data_cursor = tmp; } -MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res) +MYSQL_ROW mc_mysql_fetch_row(MYSQL_RES *res) { DBUG_ENTER("mc_mysql_fetch_row"); if (!res->data) @@ -1344,7 +1343,7 @@ MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res) } } -int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db) +int mc_mysql_select_db(MYSQL *mysql, const char *db) { int error; DBUG_ENTER("mysql_select_db"); @@ -1358,7 +1357,7 @@ int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db) } -MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql) +MYSQL_RES *mc_mysql_store_result(MYSQL *mysql) { MYSQL_RES *result; DBUG_ENTER("mysql_store_result"); diff --git a/sql/mini_client.h b/sql/mini_client.h index de78da06eec..1e17d34244e 100644 --- a/sql/mini_client.h +++ b/sql/mini_client.h @@ -18,41 +18,35 @@ #define _MINI_CLIENT_H -MYSQL* STDCALL -mc_mysql_connect(MYSQL *mysql,const char *host, const char *user, +MYSQL* mc_mysql_connect(MYSQL *mysql,const char *host, const char *user, const char *passwd, const char *db, uint port, const char *unix_socket,uint client_flag); -int STDCALL -mc_simple_command(MYSQL *mysql,enum enum_server_command command, const char *arg, +int mc_simple_command(MYSQL *mysql,enum enum_server_command command, const char *arg, uint length, my_bool skipp_check); -void STDCALL -mc_mysql_close(MYSQL *mysql); - -MYSQL * STDCALL -mc_mysql_init(MYSQL *mysql); - -void STDCALL -mc_mysql_debug(const char *debug); - -ulong STDCALL -mc_net_safe_read(MYSQL *mysql); - -char * STDCALL mc_mysql_error(MYSQL *mysql); -int STDCALL mc_mysql_errno(MYSQL *mysql); -my_bool STDCALL mc_mysql_reconnect(MYSQL* mysql); - -int STDCALL mc_mysql_send_query(MYSQL* mysql, const char* query, uint length); -int STDCALL mc_mysql_read_query_result(MYSQL *mysql); -int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length); -MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql); -void STDCALL mc_mysql_free_result(MYSQL_RES *result); -void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row); -my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res); -unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res); -MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res); -int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db); +void mc_mysql_close(MYSQL *mysql); + +MYSQL * mc_mysql_init(MYSQL *mysql); + +void mc_mysql_debug(const char *debug); +ulong mc_net_safe_read(MYSQL *mysql); + +char * mc_mysql_error(MYSQL *mysql); +int mc_mysql_errno(MYSQL *mysql); +my_bool mc_mysql_reconnect(MYSQL* mysql); + +int mc_mysql_send_query(MYSQL* mysql, const char* query, uint length); +int mc_mysql_read_query_result(MYSQL *mysql); +int mc_mysql_query(MYSQL *mysql, const char *query, uint length); +MYSQL_RES * mc_mysql_store_result(MYSQL *mysql); +void mc_mysql_free_result(MYSQL_RES *result); +void mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row); +my_ulonglong mc_mysql_num_rows(MYSQL_RES *res); +unsigned int mc_mysql_num_fields(MYSQL_RES *res); +MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res); +int mc_mysql_select_db(MYSQL *mysql, const char *db); +void mc_end_server(MYSQL *mysql); #endif diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index e0e85145c52..12a043250a5 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -519,6 +519,10 @@ void sql_print_error(const char *format,...) __attribute__ ((format (printf, 1, 2))); bool fn_format_relative_to_data_home(my_string to, const char *name, const char *dir, const char *extension); +void open_log(MYSQL_LOG *log, const char *hostname, + const char *opt_name, const char *extension, + enum_log_type type, bool read_append = 0, + bool no_auto_events = 0); extern uint32 server_id; extern char *mysql_data_home,server_version[SERVER_VERSION_LENGTH], @@ -550,9 +554,8 @@ extern pthread_mutex_t LOCK_mysql_create_db,LOCK_Acl,LOCK_open, LOCK_thread_count,LOCK_mapped_file,LOCK_user_locks, LOCK_status, LOCK_grant, LOCK_error_log, LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone, - LOCK_binlog_update, LOCK_slave, LOCK_server_id, LOCK_slave_list; -extern pthread_cond_t COND_refresh,COND_thread_count, COND_binlog_update, - COND_slave_stopped, COND_slave_start; + LOCK_server_id, LOCK_slave_list, LOCK_active_mi; +extern pthread_cond_t COND_refresh,COND_thread_count; extern pthread_attr_t connection_attrib; extern bool opt_endinfo, using_udf_functions, locked_in_memory, opt_using_transactions, use_temp_pool, mysql_embedded; @@ -588,6 +591,7 @@ extern struct show_var_st init_vars[]; extern struct show_var_st status_vars[]; extern enum db_type default_table_type; extern enum enum_tx_isolation default_tx_isolation; +extern char glob_hostname[FN_REFLEN]; #ifndef __WIN__ extern pthread_t signal_thread; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index c4af3366806..6d99db62f12 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -206,7 +206,7 @@ SHOW_COMP_OPTION have_openssl=SHOW_OPTION_NO; SHOW_COMP_OPTION have_symlink=SHOW_OPTION_YES; -static bool opt_skip_slave_start = 0; // If set, slave is not autostarted +bool opt_skip_slave_start = 0; // If set, slave is not autostarted static bool opt_do_pstack = 0; static ulong opt_specialflag=SPECIAL_ENGLISH; static ulong back_log,connect_timeout,concurrency; @@ -225,8 +225,6 @@ bool opt_sql_bin_update = 0, opt_log_slave_updates = 0, opt_safe_show_db=0, opt_safe_user_create = 0, opt_no_mix_types = 0; FILE *bootstrap_file=0; int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice -extern MASTER_INFO glob_mi; -extern int init_master_info(MASTER_INFO* mi); /* If sql_bin_update is true, SQL_LOG_UPDATE and SQL_LOG_BIN are kept in sync, @@ -238,7 +236,7 @@ static struct rand_struct sql_rand; static int cleanup_done; static char **defaults_argv,time_zone[30]; static const char *default_table_type_name; -static char glob_hostname[FN_REFLEN]; +char glob_hostname[FN_REFLEN]; #include "sslopt-vars.h" #ifdef HAVE_OPENSSL @@ -274,7 +272,9 @@ volatile ulong cached_thread_count=0; // replication parameters, if master_host is not NULL, we are a slave my_string master_user = (char*) "test", master_password = 0, master_host=0, - master_info_file = (char*) "master.info", master_ssl_key=0, master_ssl_cert=0; + master_info_file = (char*) "master.info", + relay_log_info_file = (char*) "relay-log.info", + master_ssl_key=0, master_ssl_cert=0; my_string report_user = 0, report_password = 0, report_host=0; const char *localhost=LOCAL_HOST; @@ -321,6 +321,7 @@ bool mysql_embedded=1; #endif char *opt_bin_logname = 0; // this one needs to be seen in sql_parse.cc +char *opt_relay_logname = 0, *opt_relaylog_index_name=0; char server_version[SERVER_VERSION_LENGTH]=MYSQL_SERVER_VERSION; const char *first_keyword="first"; const char **errmesg; /* Error messages */ @@ -356,11 +357,10 @@ pthread_mutex_t LOCK_mysql_create_db, LOCK_Acl, LOCK_open, LOCK_thread_count, LOCK_error_log, LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_bytes_sent, LOCK_bytes_received, - LOCK_binlog_update, LOCK_slave, LOCK_server_id, - LOCK_user_conn, LOCK_slave_list; + LOCK_server_id, + LOCK_user_conn, LOCK_slave_list, LOCK_active_mi; -pthread_cond_t COND_refresh,COND_thread_count,COND_binlog_update, - COND_slave_stopped, COND_slave_start; +pthread_cond_t COND_refresh,COND_thread_count; pthread_cond_t COND_thread_cache,COND_flush_thread_cache; pthread_t signal_thread; pthread_attr_t connection_attrib; @@ -759,6 +759,7 @@ void clean_up(bool print_message) my_free(allocated_mysql_tmpdir,MYF(MY_ALLOW_ZERO_PTR)); my_free(slave_load_tmpdir,MYF(MY_ALLOW_ZERO_PTR)); x_free(opt_bin_logname); + x_free(opt_relay_logname); bitmap_free(&temp_pool); free_max_user_conn(); end_slave_list(); @@ -1095,6 +1096,8 @@ void end_thread(THD *thd, bool put_in_cache) DBUG_PRINT("info", ("sending a broadcast")) /* Tell main we are ready */ + // TODO: explain why we broadcast outside of the lock or + // fix the bug - Sasha (void) pthread_mutex_unlock(&LOCK_thread_count); (void) pthread_cond_broadcast(&COND_thread_count); DBUG_PRINT("info", ("unlocked thread_count mutex")) @@ -1266,6 +1269,7 @@ the thread stack. Please read http://www.mysql.com/doc/L/i/Linux.html\n\n", #ifdef HAVE_STACKTRACE if(!(test_flags & TEST_NO_STACKTRACE)) { + fprintf(stderr,"thd=%p\n",thd); print_stacktrace(thd ? (gptr) thd->thread_stack : (gptr) 0, thread_stack); } @@ -1594,9 +1598,10 @@ const char *load_default_groups[]= { "mysqld","server",0 }; char *libwrapName=NULL; #endif -static void open_log(MYSQL_LOG *log, const char *hostname, +void open_log(MYSQL_LOG *log, const char *hostname, const char *opt_name, const char *extension, - enum_log_type type) + enum_log_type type, bool read_append, + bool no_auto_events) { char tmp[FN_REFLEN]; if (!opt_name || !opt_name[0]) @@ -1620,7 +1625,8 @@ static void open_log(MYSQL_LOG *log, const char *hostname, opt_name=tmp; } } - log->open(opt_name,type); + log->open(opt_name,type,0,(read_append) ? SEQ_READ_APPEND : WRITE_CACHE, + no_auto_events); } @@ -1716,19 +1722,15 @@ int main(int argc, char **argv) (void) pthread_mutex_init(&LOCK_bytes_sent,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_bytes_received,MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_timezone,MY_MUTEX_INIT_FAST); - (void) pthread_mutex_init(&LOCK_binlog_update, MY_MUTEX_INIT_FAST); // QQ NOT USED - (void) pthread_mutex_init(&LOCK_slave, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_server_id, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_user_conn, MY_MUTEX_INIT_FAST); (void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST); + (void) pthread_mutex_init(&LOCK_active_mi, MY_MUTEX_INIT_FAST); (void) pthread_cond_init(&COND_thread_count,NULL); (void) pthread_cond_init(&COND_refresh,NULL); (void) pthread_cond_init(&COND_thread_cache,NULL); (void) pthread_cond_init(&COND_flush_thread_cache,NULL); (void) pthread_cond_init(&COND_manager,NULL); - (void) pthread_cond_init(&COND_binlog_update, NULL); - (void) pthread_cond_init(&COND_slave_stopped, NULL); - (void) pthread_cond_init(&COND_slave_start, NULL); (void) pthread_cond_init(&COND_rpl_status, NULL); init_signals(); @@ -1825,20 +1827,9 @@ int main(int argc, char **argv) LOG_NEW); using_update_log=1; } - - /* - make sure slave thread gets started if server_id is set, - valid master.info is present, and master_host has not been specified - */ - if (server_id && !master_host) - { - char fname[FN_REFLEN+128]; - MY_STAT stat_area; - fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); - if (my_stat(fname, &stat_area, MYF(0)) && !init_master_info(&glob_mi)) - master_host = glob_mi.host; - } - + + init_slave(); + if (opt_bin_log && !server_id) { server_id= !master_host ? 1 : 2; @@ -1991,17 +1982,6 @@ The server will not act as a slave."); sql_print_error("Warning: Can't create thread to manage maintenance"); } - // slave thread - if (master_host) - { - pthread_t hThread; - if (!opt_skip_slave_start && - pthread_create(&hThread, &connection_attrib, handle_slave, 0)) - sql_print_error("Warning: Can't create thread to handle slave"); - else if(opt_skip_slave_start) - init_master_info(&glob_mi); - } - printf(ER(ER_READY),my_progname,server_version,""); fflush(stdout); @@ -2665,7 +2645,8 @@ enum options { OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT, OPT_SHOW_SLAVE_AUTH_INFO, OPT_OLD_RPL_COMPAT, OPT_SLAVE_LOAD_TMPDIR, OPT_NO_MIX_TYPE, - OPT_RPL_RECOVERY_RANK,OPT_INIT_RPL_ROLE + OPT_RPL_RECOVERY_RANK,OPT_INIT_RPL_ROLE, + OPT_RELAY_LOG, OPT_RELAY_LOG_INDEX, OPT_RELAY_LOG_INFO_FILE }; static struct option long_options[] = { @@ -2790,6 +2771,8 @@ static struct option long_options[] = { {"report-password", required_argument, 0, (int) OPT_REPORT_PASSWORD}, {"report-port", required_argument, 0, (int) OPT_REPORT_PORT}, {"rpl-recovery-rank", required_argument, 0, (int) OPT_RPL_RECOVERY_RANK}, + {"relay-log", required_argument, 0, (int) OPT_RELAY_LOG}, + {"relay-log-index", required_argument, 0, (int) OPT_RELAY_LOG_INDEX}, {"safe-mode", no_argument, 0, (int) OPT_SAFE}, {"safe-show-database", no_argument, 0, (int) OPT_SAFE_SHOW_DB}, {"safe-user-create", no_argument, 0, (int) OPT_SAFE_USER_CREATE}, @@ -2813,6 +2796,8 @@ static struct option long_options[] = { {"skip-stack-trace", no_argument, 0, (int) OPT_SKIP_STACK_TRACE}, {"skip-symlink", no_argument, 0, (int) OPT_SKIP_SYMLINKS}, {"skip-thread-priority", no_argument, 0, (int) OPT_SKIP_PRIOR}, + {"relay-log-info-file", required_argument, 0, + (int) OPT_RELAY_LOG_INFO_FILE}, {"slave-load-tmpdir", required_argument, 0, (int) OPT_SLAVE_LOAD_TMPDIR}, {"socket", required_argument, 0, (int) OPT_SOCKET}, {"sql-bin-update-same", no_argument, 0, (int) OPT_SQL_BIN_UPDATE_SAME}, @@ -3151,8 +3136,8 @@ struct show_var_st status_vars[]= { {"Select_range", (char*) &select_range_count, SHOW_LONG}, {"Select_range_check", (char*) &select_range_check_count, SHOW_LONG}, {"Select_scan", (char*) &select_scan_count, SHOW_LONG}, - {"Slave_running", (char*) &slave_running, SHOW_BOOL}, {"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG}, + {"Slave_running", (char*) 0, SHOW_SLAVE_RUNNING}, {"Slow_launch_threads", (char*) &slow_launch_threads, SHOW_LONG}, {"Slow_queries", (char*) &long_query_count, SHOW_LONG}, {"Sort_merge_passes", (char*) &filesort_merge_passes, SHOW_LONG}, @@ -3548,6 +3533,14 @@ static void get_options(int argc,char **argv) opt_update_log=1; opt_update_logname=optarg; // Use hostname.# if null break; + case (int) OPT_RELAY_LOG_INDEX: + opt_relaylog_index_name = optarg; + break; + case (int) OPT_RELAY_LOG: + x_free(opt_relay_logname); + if (optarg && optarg[0]) + opt_relay_logname=my_strdup(optarg,MYF(0)); + break; case (int) OPT_BIN_LOG_INDEX: opt_binlog_index_name = optarg; break; @@ -4007,6 +4000,9 @@ static void get_options(int argc,char **argv) case OPT_MASTER_INFO_FILE: master_info_file=optarg; break; + case OPT_RELAY_LOG_INFO_FILE: + relay_log_info_file=optarg; + break; case OPT_MASTER_PORT: master_port= atoi(optarg); break; diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index d846662947d..705ef58d0e7 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -199,7 +199,7 @@ void end_slave_list() static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg) { - uint32 log_seq = mi->last_log_seq; + uint32 log_pos = mi->pos; uint32 target_server_id = mi->server_id; for (;;) @@ -217,7 +217,7 @@ static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg) return 1; } - if (ev->log_seq == log_seq && ev->server_id == target_server_id) + if (ev->log_pos == log_pos && ev->server_id == target_server_id) { delete ev; mi->pos = my_b_tell(log); @@ -527,7 +527,7 @@ pthread_handler_decl(handle_failsafe_rpl,arg) const char* msg = thd->enter_cond(&COND_rpl_status, &LOCK_rpl_status, "Waiting for request"); pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status); - thd->proc_info="Processling request"; + thd->proc_info="Processing request"; while (!break_req_chain) { switch (rpl_status) @@ -630,10 +630,9 @@ static inline void cleanup_mysql_results(MYSQL_RES* db_res, static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db, - MYSQL_RES* table_res) + MYSQL_RES* table_res, MASTER_INFO* mi) { MYSQL_ROW row; - for( row = mc_mysql_fetch_row(table_res); row; row = mc_mysql_fetch_row(table_res)) { @@ -649,11 +648,9 @@ static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db, if (!tables_ok(thd, &table)) continue; } - - if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql))) + if ((error = fetch_master_table(thd, db, table_name, mi, mysql))) return error; } - return 0; } @@ -664,25 +661,26 @@ int load_master_data(THD* thd) MYSQL_RES* master_status_res = 0; bool slave_was_running = 0; int error = 0; - + const char* errmsg=0; + int restart_thread_mask; mc_mysql_init(&mysql); // we do not want anyone messing with the slave at all for the entire // duration of the data load; - pthread_mutex_lock(&LOCK_slave); - - // first, kill the slave - if ((slave_was_running = slave_running)) + LOCK_ACTIVE_MI; + lock_slave_threads(active_mi); + init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/); + if (restart_thread_mask && + (error=terminate_slave_threads(active_mi,restart_thread_mask, + 1 /*skip lock*/))) { - abort_slave = 1; - KICK_SLAVE; - thd->proc_info = "waiting for slave to die"; - while (slave_running) - pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done + send_error(&thd->net,error); + unlock_slave_threads(active_mi); + UNLOCK_ACTIVE_MI; + return 1; } - - - if (connect_to_master(thd, &mysql, &glob_mi)) + + if (connect_to_master(thd, &mysql, active_mi)) { net_printf(&thd->net, error = ER_CONNECT_TO_MASTER, mc_mysql_error(&mysql)); @@ -744,7 +742,7 @@ int load_master_data(THD* thd) mess up and not exclude mysql database with the rules when he actually means to - in this case, he is up for a surprise if his priv tables get dropped and downloaded from master - TO DO - add special option, not enabled + TODO - add special option, not enabled by default, to allow inclusion of mysql database into load data from master */ @@ -774,7 +772,7 @@ int load_master_data(THD* thd) goto err; } - if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res))) + if ((error = fetch_db_tables(thd,&mysql,db,*cur_table_res,active_mi))) { // we do not report the error - fetch_db_tables handles it cleanup_mysql_results(db_res, cur_table_res, table_res); @@ -797,14 +795,15 @@ int load_master_data(THD* thd) */ if (row[0] && row[1]) { - strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name)); - glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB - if (glob_mi.pos < 4) - glob_mi.pos = 4; // don't hit the magic number - glob_mi.pending = 0; - flush_master_info(&glob_mi); + strmake(active_mi->master_log_name, row[0], + sizeof(active_mi->master_log_name)); + // atoi() is ok, since offset is <= 1GB + active_mi->master_log_pos = atoi(row[1]); + if (active_mi->master_log_pos < 4) + active_mi->master_log_pos = 4; // don't hit the magic number + active_mi->rli.pending = 0; + flush_master_info(active_mi); } - mc_mysql_free_result(master_status_res); } @@ -815,11 +814,35 @@ int load_master_data(THD* thd) goto err; } } + thd->proc_info="purging old relay logs"; + if (purge_relay_logs(&active_mi->rli,0 /* not only reset, but also reinit*/, + &errmsg)) + { + send_error(&thd->net, 0, "Failed purging old relay logs"); + unlock_slave_threads(active_mi); + UNLOCK_ACTIVE_MI; + return 1; + } + pthread_mutex_lock(&active_mi->rli.data_lock); + active_mi->rli.master_log_pos = active_mi->master_log_pos; + strnmov(active_mi->rli.master_log_name,active_mi->master_log_name, + sizeof(active_mi->rli.master_log_name)); + pthread_cond_broadcast(&active_mi->rli.data_cond); + pthread_mutex_unlock(&active_mi->rli.data_lock); + thd->proc_info = "starting slave"; + if (restart_thread_mask) + { + error=start_slave_threads(0 /* mutex not needed*/, + 1 /* wait for start*/, + active_mi,master_info_file,relay_log_info_file, + restart_thread_mask); + } err: - pthread_mutex_unlock(&LOCK_slave); - if (slave_was_running) - start_slave(0, 0); + unlock_slave_threads(active_mi); + UNLOCK_ACTIVE_MI; + thd->proc_info = 0; + mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init() if (!error) send_ok(&thd->net); diff --git a/sql/slave.cc b/sql/slave.cc index 700838d7cd7..e68741e7434 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -24,24 +24,23 @@ #include "repl_failsafe.h" #include <thr_alarm.h> #include <my_dir.h> +#include <assert.h> -volatile bool slave_running = 0; +volatile bool slave_sql_running = 0, slave_io_running = 0; char* slave_load_tmpdir = 0; -pthread_t slave_real_id; -MASTER_INFO glob_mi; +MASTER_INFO main_mi; +MASTER_INFO* active_mi; +volatile int active_mi_in_use = 0; HASH replicate_do_table, replicate_ignore_table; DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table; bool do_table_inited = 0, ignore_table_inited = 0; bool wild_do_table_inited = 0, wild_ignore_table_inited = 0; bool table_rules_on = 0; -uint32 slave_skip_counter = 0; static TABLE* save_temporary_tables = 0; -THD* slave_thd = 0; // when slave thread exits, we need to remember the temporary tables so we // can re-use them on slave start -int last_slave_errno = 0; -char last_slave_error[MAX_SLAVE_ERRMSG] = ""; +// TODO: move the vars below under MASTER_INFO #ifndef DBUG_OFF int disconnect_slave_event_count = 0, abort_slave_event_count = 0; static int events_till_disconnect = -1; @@ -49,15 +48,17 @@ int events_till_abort = -1; static int stuck_count = 0; #endif +typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; void skip_load_data_infile(NET* net); -inline bool slave_killed(THD* thd); -static int init_slave_thread(THD* thd); +static inline bool slave_killed(THD* thd,MASTER_INFO* mi); +static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli); +static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, bool reconnect); -static int safe_sleep(THD* thd, int sec); +static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec); static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); @@ -65,6 +66,79 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi); char* rewrite_db(char* db); +void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse) +{ + bool set_io = mi->slave_running, set_sql = mi->rli.slave_running; + if (inverse) + { + /* This makes me think of the Russian idiom "I am not I, and this is + not my horse", which is used to deny reponsibility for + one's actions. + */ + set_io = !set_io; + set_sql = !set_sql; + } + register int tmp_mask=0; + if (set_io) + tmp_mask |= SLAVE_IO; + if (set_sql) + tmp_mask |= SLAVE_SQL; + *mask = tmp_mask; +} + +void lock_slave_threads(MASTER_INFO* mi) +{ + //TODO: see if we can do this without dual mutex + pthread_mutex_lock(&mi->run_lock); + pthread_mutex_lock(&mi->rli.run_lock); +} + +void unlock_slave_threads(MASTER_INFO* mi) +{ + //TODO: see if we can do this without dual mutex + pthread_mutex_unlock(&mi->rli.run_lock); + pthread_mutex_unlock(&mi->run_lock); +} + +int init_slave() +{ + // TODO (multi-master): replace this with list initialization + active_mi = &main_mi; + + // TODO: the code below is a copy-paste mess - clean it up + /* + make sure slave thread gets started if server_id is set, + valid master.info is present, and master_host has not been specified + */ + if (server_id && !master_host) + { + // TODO: re-write this to interate through the list of files + // for multi-master + char fname[FN_REFLEN+128]; + MY_STAT stat_area; + fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); + if (my_stat(fname, &stat_area, MYF(0)) && + !init_master_info(active_mi,master_info_file,relay_log_info_file)) + master_host = active_mi->host; + } + // slave thread + if (master_host) + { + if (!opt_skip_slave_start && start_slave_threads(1 /* need mutex */, + 0 /* no wait for start*/, + active_mi, + master_info_file, + relay_log_info_file, + SLAVE_IO|SLAVE_SQL + )) + sql_print_error("Warning: Can't create threads to handle slave"); + else if (opt_skip_slave_start) + if (init_master_info(active_mi, master_info_file, relay_log_info_file)) + sql_print_error("Warning: failed to initialized master info"); + } + return 0; +} + static void free_table_ent(TABLE_RULE_ENT* e) { my_free((gptr) e, MYF(0)); @@ -77,6 +151,285 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, return (byte*)e->db; } +// TODO: check proper initialization of master_log_name/master_log_pos +int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, + ulonglong pos, bool need_data_lock, + const char** errmsg) +{ + if (rli->log_pos_current) + return 0; + pthread_mutex_t *log_lock=rli->relay_log.get_log_lock(); + pthread_mutex_lock(log_lock); + if (need_data_lock) + pthread_mutex_lock(&rli->data_lock); + + if (rli->cur_log_fd >= 0) + { + end_io_cache(&rli->cache_buf); + my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + } + + if (!log) + log = rli->relay_log_name; // already inited + if (!pos) + pos = rli->relay_log_pos; // already inited + else + rli->relay_log_pos = pos; + if (rli->relay_log.find_first_log(&rli->linfo,log)) + { + *errmsg="Could not find first log during relay log initialization"; + goto err; + } + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + // to make end_io_cache(&rli->cache_buf) safe in all cases + if (!rli->inited) + bzero((char*) &rli->cache_buf, sizeof(IO_CACHE)); + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { + if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 && + check_binlog_magic(rli->cur_log,errmsg)) + { + goto err; + } + rli->cur_log_init_count=rli->cur_log->init_count; + } + else + { + if (rli->inited) + end_io_cache(&rli->cache_buf); + if (rli->cur_log_fd>=0) + my_close(rli->cur_log_fd,MYF(MY_WME)); + if ((rli->cur_log_fd=open_binlog(&rli->cache_buf, + rli->linfo.log_file_name,errmsg)) < 0) + { + goto err; + } + rli->cur_log = &rli->cache_buf; + } + if (pos > 4) + my_b_seek(rli->cur_log,(off_t)pos); + rli->log_pos_current=1; +err: + pthread_cond_broadcast(&rli->data_cond); + if (need_data_lock) + pthread_mutex_unlock(&rli->data_lock); + pthread_mutex_unlock(log_lock); + return (*errmsg) ? 1 : 0; +} + +// we assume we have a run lock on rli and that the both slave thread +// are not running +int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) +{ + if (!rli->inited) + return 0; /* successfully do nothing */ + DBUG_ASSERT(rli->slave_running == 0); + DBUG_ASSERT(rli->mi->slave_running == 0); + int error=0; + rli->slave_skip_counter=0; + pthread_mutex_lock(&rli->data_lock); + rli->pending=0; + rli->master_log_name[0]=0; + rli->master_log_pos=0; // 0 means uninitialized + if (rli->relay_log.reset_logs(rli->sql_thd) || + rli->relay_log.find_first_log(&rli->linfo,"")) + { + *errmsg = "Failed during log reset"; + error=1; + goto err; + } + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + rli->relay_log_pos=4; + rli->log_pos_current=0; + if (!just_reset) + error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg); +err: + pthread_mutex_unlock(&rli->data_lock); + return error; +} + +int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock) +{ + if (!mi->inited) + return 0; /* successfully do nothing */ + int error,force_all = (thread_mask & SLAVE_FORCE_ALL); + pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; + pthread_mutex_t *sql_cond_lock,*io_cond_lock; + + sql_cond_lock=sql_lock; + io_cond_lock=io_lock; + + if (skip_lock) + { + sql_lock = io_lock = 0; + } + if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running) + { + mi->abort_slave=1; + if ((error=terminate_slave_thread(mi->io_thd,io_lock, + io_cond_lock, + &mi->stop_cond, + &mi->slave_running)) && + !force_all) + return error; + } + if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running) + { + DBUG_ASSERT(mi->rli.sql_thd != 0) ; + mi->rli.abort_slave=1; + if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock, + sql_cond_lock, + &mi->rli.stop_cond, + &mi->rli.slave_running)) && + !force_all) + return error; + } + return 0; +} + +int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock, + pthread_mutex_t *cond_lock, + pthread_cond_t* term_cond, + volatile bool* slave_running) +{ + if (term_lock) + { + pthread_mutex_lock(term_lock); + if (!*slave_running) + { + pthread_mutex_unlock(term_lock); + return ER_SLAVE_NOT_RUNNING; + } + } + DBUG_ASSERT(thd != 0); + KICK_SLAVE(thd); + while (*slave_running) + { + /* there is a small chance that slave thread might miss the first + alarm. To protect againts it, resend the signal until it reacts + */ + struct timespec abstime; +#ifdef HAVE_TIMESPEC_TS_SEC + abstime.ts_sec=time(NULL)+2; + abstime.ts_nsec=0; +#elif defined(__WIN__) + abstime.tv_sec=time((time_t*) 0)+2; + abstime.tv_nsec=0; +#else + struct timeval tv; + gettimeofday(&tv,0); + abstime.tv_sec=tv.tv_sec+2; + abstime.tv_nsec=tv.tv_usec*1000; +#endif + pthread_cond_timedwait(term_cond, cond_lock, &abstime); + if (*slave_running) + KICK_SLAVE(thd); + } + if (term_lock) + pthread_mutex_unlock(term_lock); + return 0; +} + +int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock, + pthread_mutex_t *cond_lock, + pthread_cond_t* start_cond, + volatile bool* slave_running, + MASTER_INFO* mi) +{ + pthread_t th; + DBUG_ASSERT(mi->inited); + if (start_lock) + pthread_mutex_lock(start_lock); + if (!server_id) + { + if (start_cond) + pthread_cond_broadcast(start_cond); + if (start_lock) + pthread_mutex_unlock(start_lock); + sql_print_error("Server id not set, will not start slave"); + return ER_BAD_SLAVE; + } + + if (*slave_running) + { + if (start_cond) + pthread_cond_broadcast(start_cond); + if (start_lock) + pthread_mutex_unlock(start_lock); + return ER_SLAVE_MUST_STOP; + } + if (pthread_create(&th, &connection_attrib, h_func, (void*)mi)) + { + if (start_lock) + pthread_mutex_unlock(start_lock); + return ER_SLAVE_THREAD; + } + if (start_cond && cond_lock) + { + THD* thd = current_thd; + while (!*slave_running) + { + const char* old_msg = thd->enter_cond(start_cond,cond_lock, + "Waiting for slave thread to start"); + pthread_cond_wait(start_cond,cond_lock); + thd->exit_cond(old_msg); + // TODO: in a very rare case of init_slave_thread failing, it is + // possible that we can get stuck here since slave_running will not + // be set. We need to change slave_running to int and have -1 as + // error code + if (thd->killed) + { + pthread_mutex_unlock(cond_lock); + return ER_SERVER_SHUTDOWN; + } + } + } + if (start_lock) + pthread_mutex_unlock(start_lock); + return 0; +} +/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make + sense to do that for starting a slave - we always care if it actually + started the threads that were not previously running +*/ +int start_slave_threads(bool need_slave_mutex, bool wait_for_start, + MASTER_INFO* mi, const char* master_info_fname, + const char* slave_info_fname, int thread_mask) +{ + pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0; + pthread_cond_t* cond_io=0,*cond_sql=0; + int error=0; + + if (need_slave_mutex) + { + lock_io = &mi->run_lock; + lock_sql = &mi->rli.run_lock; + } + if (wait_for_start) + { + cond_io = &mi->start_cond; + cond_sql = &mi->rli.start_cond; + lock_cond_io = &mi->run_lock; + lock_cond_sql = &mi->rli.run_lock; + } + if (init_master_info(mi,master_info_fname,slave_info_fname)) + return ER_MASTER_INFO; + + if ((thread_mask & SLAVE_IO) && + (error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io, + cond_io,&mi->slave_running, + mi))) + return error; + if ((thread_mask & SLAVE_SQL) && + (error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql, + cond_sql, + &mi->rli.slave_running,mi))) + return error; + return 0; +} void init_table_rule_hash(HASH* h, bool* h_inited) { @@ -98,11 +451,11 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len) uint i; const char* key_end = key + len; - for(i = 0; i < a->elements; i++) + for (i = 0; i < a->elements; i++) { TABLE_RULE_ENT* e ; get_dynamic(a, (gptr)&e, i); - if(!wild_case_compare(key, key_end, (const char*)e->db, + if (!wild_case_compare(key, key_end, (const char*)e->db, (const char*)(e->db + e->key_len),'\\')) return e; } @@ -126,7 +479,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables) if (hash_search(&replicate_do_table, (byte*) hash_key, len)) return 1; } - if (ignore_table_inited) // if there are any do's + if (ignore_table_inited) // if there are any ignores { if (hash_search(&replicate_ignore_table, (byte*) hash_key, len)) return 0; @@ -191,44 +544,52 @@ static void free_string_array(DYNAMIC_ARRAY *a) delete_dynamic(a); } +static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/) +{ + end_master_info(mi); + return 0; +} + void end_slave() { - pthread_mutex_lock(&LOCK_slave); - if (slave_running) - { - abort_slave = 1; - thr_alarm_kill(slave_real_id); -#ifdef SIGNAL_WITH_VIO_CLOSE - slave_thd->close_active_vio(); -#endif - while (slave_running) - pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); - } - pthread_mutex_unlock(&LOCK_slave); - - end_master_info(&glob_mi); - if(do_table_inited) + // TODO: replace the line below with + // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); + // once multi-master code is ready + terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); + end_master_info(active_mi); + if (do_table_inited) hash_free(&replicate_do_table); - if(ignore_table_inited) + if (ignore_table_inited) hash_free(&replicate_ignore_table); - if(wild_do_table_inited) + if (wild_do_table_inited) free_string_array(&replicate_wild_do_table); - if(wild_ignore_table_inited) + if (wild_ignore_table_inited) free_string_array(&replicate_wild_ignore_table); } -inline bool slave_killed(THD* thd) +static inline bool slave_killed(THD* thd, MASTER_INFO* mi) { - return abort_slave || abort_loop || thd->killed; + DBUG_ASSERT(mi->io_thd == thd); + DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun + return mi->abort_slave || abort_loop || thd->killed; } -void slave_print_error(int err_code, const char* msg, ...) +static inline bool slave_killed(THD* thd, RELAY_LOG_INFO* rli) +{ + DBUG_ASSERT(rli->sql_thd == thd); + DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun + return rli->abort_slave || abort_loop || thd->killed; +} + +void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...) { va_list args; va_start(args,msg); - my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args); - sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code); - last_slave_errno = err_code; + my_vsnprintf(rli->last_slave_error, + sizeof(rli->last_slave_error), msg, args); + sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error, + err_code); + rli->last_slave_errno = err_code; } void skip_load_data_infile(NET* net) @@ -476,16 +837,16 @@ err: return error; } -int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, +int fetch_master_table(THD* thd, const char* db_name, const char* table_name, MASTER_INFO* mi, MYSQL* mysql) { int error = 1; - int nx_errno = 0; + int fetch_errno = 0; bool called_connected = (mysql != NULL); if (!called_connected && !(mysql = mc_mysql_init(NULL))) { - sql_print_error("fetch_nx_table: Error in mysql_init()"); - nx_errno = ER_GET_ERRNO; + sql_print_error("fetch_master_table: Error in mysql_init()"); + fetch_errno = ER_GET_ERRNO; goto err; } @@ -495,17 +856,17 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, { sql_print_error("Could not connect to master while fetching table\ '%-64s.%-64s'", db_name, table_name); - nx_errno = ER_CONNECT_TO_MASTER; + fetch_errno = ER_CONNECT_TO_MASTER; goto err; } } - if (slave_killed(thd)) + if (thd->killed) goto err; if (request_table_dump(mysql, db_name, table_name)) { - nx_errno = ER_GET_ERRNO; - sql_print_error("fetch_nx_table: failed on table dump request "); + fetch_errno = ER_GET_ERRNO; + sql_print_error("fetch_master_table: failed on table dump request "); goto err; } @@ -513,24 +874,25 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, table_name)) { // create_table_from_dump will have sent the error alread - sql_print_error("fetch_nx_table: failed on create table "); + sql_print_error("fetch_master_table: failed on create table "); goto err; } - error = 0; - err: if (mysql && !called_connected) mc_mysql_close(mysql); - if (nx_errno && thd->net.vio) - send_error(&thd->net, nx_errno, "Error in fetch_nx_table"); + if (fetch_errno && thd->net.vio) + send_error(&thd->net, fetch_errno, "Error in fetch_master_table"); thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump return error; } void end_master_info(MASTER_INFO* mi) { - if(mi->fd >= 0) + if (!mi->inited) + return; + end_relay_log_info(&mi->rli); + if (mi->fd >= 0) { end_io_cache(&mi->file); (void)my_close(mi->fd, MYF(MY_WME)); @@ -539,21 +901,136 @@ void end_master_info(MASTER_INFO* mi) mi->inited = 0; } -int init_master_info(MASTER_INFO* mi) +int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) +{ + if (rli->inited) + return 0; + MY_STAT stat_area; + char fname[FN_REFLEN+128]; + int info_fd; + const char* msg = 0; + int error = 0; + + fn_format(fname, info_fname, + mysql_data_home, "", 4+32); + pthread_mutex_lock(&rli->data_lock); + info_fd = rli->info_fd; + rli->pending = 0; + rli->cur_log_fd = -1; + rli->slave_skip_counter=0; + rli->log_pos_current=0; + // TODO: make this work with multi-master + if (!opt_relay_logname) + { + char tmp[FN_REFLEN]; + /* TODO: The following should be using fn_format(); We just need to + first change fn_format() to cut the file name if it's too long. + */ + strmake(tmp,glob_hostname,FN_REFLEN-5); + strmov(strcend(tmp,'.'),"-relay-bin"); + opt_relay_logname=my_strdup(tmp,MYF(MY_WME)); + } + rli->relay_log.set_index_file_name(opt_relaylog_index_name); + open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin", + LOG_BIN, 1 /* read_append cache */, + 1 /* no auto events*/); + + /* if file does not exist */ + if (!my_stat(fname, &stat_area, MYF(0))) + { + // if someone removed the file from underneath our feet, just close + // the old descriptor and re-create the old file + if (info_fd >= 0) + my_close(info_fd, MYF(MY_WME)); + if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 + || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, + MYF(MY_WME))) + { + if(info_fd >= 0) + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; + } + if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg)) + goto err; + rli->master_log_pos = 0; // uninitialized + rli->info_fd = info_fd; + } + else // file exists + { + if(info_fd >= 0) + reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0); + else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 + || init_io_cache(&rli->info_file, info_fd, + IO_SIZE*2, READ_CACHE, 0L, + 0, MYF(MY_WME))) + { + if (info_fd >= 0) + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; + } + + rli->info_fd = info_fd; + if (init_strvar_from_file(rli->relay_log_name, + sizeof(rli->relay_log_name), &rli->info_file, + (char*)"") || + init_intvar_from_file((int*)&rli->relay_log_pos, + &rli->info_file, 4) || + init_strvar_from_file(rli->master_log_name, + sizeof(rli->master_log_name), &rli->info_file, + (char*)"") || + init_intvar_from_file((int*)&rli->master_log_pos, + &rli->info_file, 0)) + { + msg="Error reading slave log configuration"; + goto err; + } + if (init_relay_log_pos(rli,0 /*log already inited*/, + 0 /*pos already inited*/, + 0 /* no data lock*/, + &msg)) + goto err; + } + DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + rli->inited = 1; + // now change the cache from READ to WRITE - must do this + // before flush_relay_log_info + reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); + error=test(flush_relay_log_info(rli)); + pthread_mutex_unlock(&rli->data_lock); + return error; + +err: + sql_print_error(msg); + end_io_cache(&rli->info_file); + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; +} + +int init_master_info(MASTER_INFO* mi, const char* master_info_fname, + const char* slave_info_fname) { if (mi->inited) return 0; + if (init_relay_log_info(&mi->rli, slave_info_fname)) + return 1; + mi->rli.mi = mi; int fd,length,error; MY_STAT stat_area; char fname[FN_REFLEN+128]; const char *msg; - fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + fn_format(fname, master_info_fname, mysql_data_home, "", 4+32); // we need a mutex while we are changing master info parameters to // keep other threads from reading bogus info - pthread_mutex_lock(&mi->lock); - mi->pending = 0; + pthread_mutex_lock(&mi->data_lock); fd = mi->fd; // we do not want any messages if the file does not exist @@ -569,11 +1046,13 @@ int init_master_info(MASTER_INFO* mi) { if(fd >= 0) my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + end_relay_log_info(&mi->rli); + pthread_mutex_unlock(&mi->data_lock); return 1; } - mi->log_file_name[0] = 0; - mi->pos = 4; // skip magic number + mi->master_log_name[0] = 0; + mi->master_log_pos = 4; // skip magic number mi->fd = fd; if (master_host) @@ -595,28 +1074,19 @@ int init_master_info(MASTER_INFO* mi) { if(fd >= 0) my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + end_relay_log_info(&mi->rli); + pthread_mutex_unlock(&mi->data_lock); return 1; } - - if ((length=my_b_gets(&mi->file, mi->log_file_name, - sizeof(mi->log_file_name))) < 1) - { - msg="Error reading log file name from master info file "; - goto error; - } - - mi->log_file_name[length-1]= 0; // kill \n - /* Reuse fname buffer */ - if(!my_b_gets(&mi->file, fname, sizeof(fname))) - { - msg="Error reading log file position from master info file"; - goto error; - } - mi->pos = strtoull(fname,(char**) 0, 10); mi->fd = fd; - if(init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, + if (init_strvar_from_file(mi->master_log_name, + sizeof(mi->master_log_name), &mi->file, + (char*)"") || + init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4) + || + init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, master_host) || init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file, master_user) || @@ -624,12 +1094,11 @@ int init_master_info(MASTER_INFO* mi) master_password) || init_intvar_from_file((int*)&mi->port, &mi->file, master_port) || init_intvar_from_file((int*)&mi->connect_retry, &mi->file, - master_connect_retry) || - init_intvar_from_file((int*)&mi->last_log_seq, &mi->file, 0) + master_connect_retry) ) { msg="Error reading master configuration"; - goto error; + goto err; } } @@ -638,14 +1107,17 @@ int init_master_info(MASTER_INFO* mi) // before flush_master_info reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1); error=test(flush_master_info(mi)); - pthread_mutex_unlock(&mi->lock); + pthread_mutex_unlock(&mi->data_lock); return error; -error: +err: sql_print_error(msg); end_io_cache(&mi->file); + end_relay_log_info(&mi->rli); + DBUG_ASSERT(fd>=0); my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + pthread_mutex_unlock(&mi->data_lock); return 1; } @@ -654,14 +1126,14 @@ int register_slave_on_master(MYSQL* mysql) String packet; char buf[4]; - if(!report_host) + if (!report_host) return 0; int4store(buf, server_id); packet.append(buf, 4); net_store_data(&packet, report_host); - if(report_user) + if (report_user) net_store_data(&packet, report_user); else packet.append((char)0); @@ -678,7 +1150,7 @@ int register_slave_on_master(MYSQL* mysql) int4store(buf, 0); /* tell the master will fill in master_id */ packet.append(buf, 4); - if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(), + if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(), packet.length(), 0)) { sql_print_error("Error on COM_REGISTER_SLAVE: '%s'", @@ -689,52 +1161,61 @@ int register_slave_on_master(MYSQL* mysql) return 0; } - -int show_master_info(THD* thd) +int show_master_info(THD* thd, MASTER_INFO* mi) { + // TODO: fix this for multi-master DBUG_ENTER("show_master_info"); List<Item> field_list; field_list.push_back(new Item_empty_string("Master_Host", - sizeof(glob_mi.host))); + sizeof(mi->host))); field_list.push_back(new Item_empty_string("Master_User", - sizeof(glob_mi.user))); + sizeof(mi->user))); field_list.push_back(new Item_empty_string("Master_Port", 6)); field_list.push_back(new Item_empty_string("Connect_retry", 6)); - field_list.push_back( new Item_empty_string("Log_File", + field_list.push_back(new Item_empty_string("Master_Log_File", FN_REFLEN)); - field_list.push_back(new Item_empty_string("Pos", 12)); - field_list.push_back(new Item_empty_string("Slave_Running", 3)); + field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12)); + field_list.push_back(new Item_empty_string("Relay_Log_File", + FN_REFLEN)); + field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12)); + field_list.push_back(new Item_empty_string("Relay_Master_Log_File", + FN_REFLEN)); + field_list.push_back(new Item_empty_string("Slave_IO_Running", 3)); + field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3)); field_list.push_back(new Item_empty_string("Replicate_do_db", 20)); field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20)); field_list.push_back(new Item_empty_string("Last_errno", 4)); field_list.push_back(new Item_empty_string("Last_error", 20)); field_list.push_back(new Item_empty_string("Skip_counter", 12)); - field_list.push_back(new Item_empty_string("Last_log_seq", 12)); + field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12)); if(send_fields(thd, field_list, 1)) DBUG_RETURN(-1); String* packet = &thd->packet; - uint32 last_log_seq; packet->length(0); - pthread_mutex_lock(&glob_mi.lock); - net_store_data(packet, glob_mi.host); - net_store_data(packet, glob_mi.user); - net_store_data(packet, (uint32) glob_mi.port); - net_store_data(packet, (uint32) glob_mi.connect_retry); - net_store_data(packet, glob_mi.log_file_name); - net_store_data(packet, (longlong) glob_mi.pos); - last_log_seq = glob_mi.last_log_seq; - pthread_mutex_unlock(&glob_mi.lock); - pthread_mutex_lock(&LOCK_slave); // QQ; This is not needed - net_store_data(packet, slave_running ? "Yes":"No"); - pthread_mutex_unlock(&LOCK_slave); // QQ; This is not needed + pthread_mutex_lock(&mi->data_lock); + pthread_mutex_lock(&mi->rli.data_lock); + net_store_data(packet, mi->host); + net_store_data(packet, mi->user); + net_store_data(packet, (uint32) mi->port); + net_store_data(packet, (uint32) mi->connect_retry); + net_store_data(packet, mi->master_log_name); + net_store_data(packet, (longlong) mi->master_log_pos); + net_store_data(packet, mi->rli.relay_log_name + + dirname_length(mi->rli.relay_log_name)); + net_store_data(packet, (longlong) mi->rli.relay_log_pos); + net_store_data(packet, mi->rli.master_log_name); + net_store_data(packet, mi->slave_running ? "Yes":"No"); + net_store_data(packet, mi->rli.slave_running ? "Yes":"No"); net_store_data(packet, &replicate_do_db); net_store_data(packet, &replicate_ignore_db); - net_store_data(packet, (uint32)last_slave_errno); - net_store_data(packet, last_slave_error); - net_store_data(packet, slave_skip_counter); - net_store_data(packet, last_log_seq); + net_store_data(packet, (uint32)mi->rli.last_slave_errno); + net_store_data(packet, mi->rli.last_slave_error); + net_store_data(packet, mi->rli.slave_skip_counter); + net_store_data(packet, (longlong)mi->rli.master_log_pos); + pthread_mutex_unlock(&mi->rli.data_lock); + pthread_mutex_unlock(&mi->data_lock); if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length())) DBUG_RETURN(-1); @@ -747,58 +1228,64 @@ int flush_master_info(MASTER_INFO* mi) { IO_CACHE* file = &mi->file; char lbuf[22]; - char lbuf1[22]; my_b_seek(file, 0L); my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n", - mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, - mi->password, mi->port, mi->connect_retry, - llstr(mi->last_log_seq, lbuf1)); + mi->master_log_name, llstr(mi->master_log_pos, lbuf), + mi->host, mi->user, + mi->password, mi->port, mi->connect_retry + ); flush_io_cache(file); return 0; } -int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos) +/* TODO: the code below needs to be re-written almost from scratch + Main issue is how to find out if we have reached a certain position + in the master log my knowing the offset in the relay log. + */ +int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, + ulonglong log_pos) { if (!inited) return -1; - bool pos_reached; + bool pos_reached = 0; int event_count = 0; - pthread_mutex_lock(&lock); - while(!thd->killed) + pthread_mutex_lock(&data_lock); + while (!thd->killed) { int cmp_result; - if (*log_file_name) + DBUG_ASSERT(*master_log_name || master_log_pos == 0); + if (*master_log_name) { /* We should use dirname_length() here when we have a version of this that doesn't modify the argument */ - char *basename = strrchr(log_file_name, FN_LIBCHAR); + char *basename = strrchr(master_log_name, FN_LIBCHAR); if (basename) ++basename; else - basename = log_file_name; + basename = master_log_name; cmp_result = strncmp(basename, log_name->ptr(), log_name->length()); } else cmp_result = 0; - pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0); + pos_reached = ((!cmp_result && master_log_pos >= log_pos) || + cmp_result > 0); if (pos_reached || thd->killed) break; - const char* msg = thd->enter_cond(&cond, &lock, + const char* msg = thd->enter_cond(&data_cond, &data_lock, "Waiting for master update"); - pthread_cond_wait(&cond, &lock); + pthread_cond_wait(&data_cond, &data_lock); thd->exit_cond(msg); event_count++; } - pthread_mutex_unlock(&lock); + pthread_mutex_unlock(&data_lock); return thd->killed ? -1 : event_count; } - -static int init_slave_thread(THD* thd) +static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) { DBUG_ENTER("init_slave_thread"); thd->system_thread = thd->bootstrap = 1; @@ -812,7 +1299,7 @@ static int init_slave_thread(THD* thd) thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ; thd->system_thread = 1; thd->client_capabilities = CLIENT_LOCAL_FILES; - slave_real_id=thd->real_id=pthread_self(); + thd->real_id=pthread_self(); pthread_mutex_lock(&LOCK_thread_count); thd->thread_id = thread_id++; pthread_mutex_unlock(&LOCK_thread_count); @@ -822,7 +1309,6 @@ static int init_slave_thread(THD* thd) my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) || my_pthread_setspecific_ptr(THR_NET, &thd->net)) { - close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed? end_thread(thd,0); DBUG_RETURN(-1); } @@ -839,14 +1325,21 @@ static int init_slave_thread(THD* thd) if (thd->max_join_size == (ulong) ~0L) thd->options |= OPTION_BIG_SELECTS; - thd->proc_info="Waiting for master update"; + if (thd_type == SLAVE_THD_SQL) + { + thd->proc_info = "Waiting for the next event in slave queue"; + } + else + { + thd->proc_info="Waiting for master update"; + } thd->version=refresh_version; thd->set_time(); DBUG_RETURN(0); } -static int safe_sleep(THD* thd, int sec) +static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec) { thr_alarm_t alarmed; thr_alarm_init(&alarmed); @@ -869,21 +1362,20 @@ static int safe_sleep(THD* thd, int sec) if (thr_alarm_in_use(&alarmed)) thr_end_alarm(&alarmed); - if (slave_killed(thd)) + if (slave_killed(thd,mi)) return 1; start_time=time((time_t*) 0); } return 0; } - static int request_dump(MYSQL* mysql, MASTER_INFO* mi) { char buf[FN_REFLEN + 10]; int len; int binlog_flags = 0; // for now - char* logname = mi->log_file_name; - int4store(buf, mi->pos); + char* logname = mi->master_log_name; + int4store(buf, mi->master_log_pos); int2store(buf + 4, binlog_flags); int4store(buf + 6, server_id); len = (uint) strlen(logname); @@ -929,7 +1421,6 @@ command"); return 0; } - static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) { ulong len = packet_error; @@ -944,13 +1435,13 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) return packet_error; #endif - while (!abort_loop && !abort_slave && len == packet_error && + while (!abort_loop && !mi->abort_slave && len == packet_error && read_errno == EINTR ) { len = mc_net_safe_read(mysql); read_errno = errno; } - if (abort_loop || abort_slave) + if (abort_loop || mi->abort_slave) return packet_error; if (len == packet_error || (long) len < 1) { @@ -973,65 +1464,74 @@ server_errno=%d)", return len - 1; } -int check_expected_error(THD* thd, int expected_error) +int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error) { - switch(expected_error) + switch (expected_error) { case ER_NET_READ_ERROR: case ER_NET_ERROR_ON_WRITE: case ER_SERVER_SHUTDOWN: case ER_NEW_ABORTING_CONNECTION: - my_snprintf(last_slave_error, sizeof(last_slave_error), + my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error), "Slave: query '%s' partially completed on the master \ and was aborted. There is a chance that your master is inconsistent at this \ point. If you are sure that your master is ok, run this query manually on the\ slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\ SLAVE START;", thd->query); - last_slave_errno = expected_error; - sql_print_error("%s",last_slave_error); + rli->last_slave_errno = expected_error; + sql_print_error("%s",rli->last_slave_error); return 1; default: return 0; } } -static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) +static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) { const char *error_msg; - Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, - event_len, &error_msg, - mi->old_format); + DBUG_ASSERT(rli->sql_thd==thd); + Log_event * ev = next_event(rli); + DBUG_ASSERT(rli->sql_thd==thd); + if (slave_killed(thd,rli)) + return 1; if (ev) { int type_code = ev->get_type_code(); int exec_res; + pthread_mutex_lock(&rli->data_lock); if (ev->server_id == ::server_id || - (slave_skip_counter && type_code != ROTATE_EVENT)) + (rli->slave_skip_counter && type_code != ROTATE_EVENT)) { - if(type_code == LOAD_EVENT) - skip_load_data_infile(net); - - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - if(slave_skip_counter && /* protect against common user error of + /* + TODO: I/O thread must handle skipping file delivery for + old load data infile events + */ + /* TODO: I/O thread should not even log events with the same server id */ + rli->inc_pos(ev->get_event_len(), + type_code != STOP_EVENT ? ev->log_pos : 0, + 1/* skip lock*/); + flush_relay_log_info(rli); + if (rli->slave_skip_counter && /* protect against common user error of setting the counter to 1 instead of 2 while recovering from an failed auto-increment insert */ - !(type_code == INTVAR_EVENT && - slave_skip_counter == 1)) - --slave_skip_counter; + !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) && + rli->slave_skip_counter == 1)) + --rli->slave_skip_counter; + pthread_mutex_unlock(&rli->data_lock); delete ev; return 0; // avoid infinite update loops } + pthread_mutex_unlock(&rli->data_lock); thd->server_id = ev->server_id; // use the original server id for logging thd->set_time(); // time the query - if(!thd->log_seq) - thd->log_seq = ev->log_seq; if (!ev->when) ev->when = time(NULL); ev->thd = thd; - exec_res = ev->exec_event(mi); + thd->log_pos = ev->log_pos; + exec_res = ev->exec_event(rli); + DBUG_ASSERT(rli->sql_thd==thd); delete ev; return exec_res; } @@ -1044,167 +1544,148 @@ This may also be a network problem, or just a bug in the master or slave code.\ return 1; } } - -// slave thread -pthread_handler_decl(handle_slave,arg __attribute__((unused))) +/* slave I/O thread */ +pthread_handler_decl(handle_slave_io,arg) { #ifndef DBUG_OFF slave_begin: #endif THD *thd; // needs to be first for thread_stack MYSQL *mysql = NULL ; + MASTER_INFO* mi = (MASTER_INFO*)arg; char llbuff[22]; - - pthread_mutex_lock(&LOCK_slave); - if (!server_id) - { - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - sql_print_error("Server id not set, will not start slave"); - pthread_exit((void*)1); - } + bool retried_once = 0; + ulonglong last_failed_pos = 0; // TODO: see if last_failed_pos is needed + DBUG_ASSERT(mi->inited); - if(slave_running) - { - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - pthread_exit((void*)1); // safety just in case - } - slave_running = 1; - abort_slave = 0; + pthread_mutex_lock(&mi->run_lock); #ifndef DBUG_OFF - events_till_abort = abort_slave_event_count; + mi->events_till_abort = abort_slave_event_count; #endif - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - - // int error = 1; - bool retried_once = 0; - ulonglong last_failed_pos = 0; // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); - slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ ! - DBUG_ENTER("handle_slave"); + thd = new THD; // note that contructor of THD uses DBUG_ ! + DBUG_ENTER("handle_slave_io"); pthread_detach_this_thread(); - if (init_slave_thread(thd) || init_master_info(&glob_mi)) + if (init_slave_thread(thd, SLAVE_THD_IO)) { - sql_print_error("Failed during slave thread initialization"); + pthread_cond_broadcast(&mi->start_cond); + pthread_mutex_unlock(&mi->run_lock); + sql_print_error("Failed during slave I/O thread initialization"); goto err; } + mi->io_thd = thd; thd->thread_stack = (char*)&thd; // remember where our stack is - thd->temporary_tables = save_temporary_tables; // restore temp tables threads.append(thd); - glob_mi.pending = 0; //this should always be set to 0 when the slave thread - // is started + mi->slave_running = 1; + mi->abort_slave = 0; + pthread_cond_broadcast(&mi->start_cond); + pthread_mutex_unlock(&mi->run_lock); DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", - glob_mi.log_file_name, llstr(glob_mi.pos,llbuff))); - + mi->master_log_name, llstr(mi->master_log_pos,llbuff))); if (!(mysql = mc_mysql_init(NULL))) { - sql_print_error("Slave thread: error in mc_mysql_init()"); + sql_print_error("Slave I/O thread: error in mc_mysql_init()"); goto err; } thd->proc_info = "connecting to master"; #ifndef DBUG_OFF - sql_print_error("Slave thread initialized"); + sql_print_error("Slave I/O thread initialized"); #endif // we can get killed during safe_connect - if (!safe_connect(thd, mysql, &glob_mi)) - sql_print_error("Slave: connected to master '%s@%s:%d',\ - replication started in log '%s' at position %s", glob_mi.user, - glob_mi.host, glob_mi.port, - RPL_LOG_NAME, - llstr(glob_mi.pos,llbuff)); + if (!safe_connect(thd, mysql, mi)) + sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\ + replication started in log '%s' at position %s", mi->user, + mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); else { - sql_print_error("Slave thread killed while connecting to master"); + sql_print_error("Slave I/O thread killed while connecting to master"); goto err; } connected: thd->slave_net = &mysql->net; - // register ourselves with the master - // if fails, this is not fatal - we just print the error message and go - // on with life thd->proc_info = "Checking master version"; - if (check_master_version(mysql, &glob_mi)) + if (check_master_version(mysql, mi)) { goto err; } - if (!glob_mi.old_format) + if (!mi->old_format) { + // register ourselves with the master + // if fails, this is not fatal - we just print the error message and go + // on with life thd->proc_info = "Registering slave on master"; if (register_slave_on_master(mysql) || update_slave_list(mysql)) goto err; } - while (!slave_killed(thd)) + while (!slave_killed(thd,mi)) { thd->proc_info = "Requesting binlog dump"; - if(request_dump(mysql, &glob_mi)) + if (request_dump(mysql, mi)) { sql_print_error("Failed on request_dump()"); - if(slave_killed(thd)) + if(slave_killed(thd,mi)) { - sql_print_error("Slave thread killed while requesting master \ + sql_print_error("Slave I/O thread killed while requesting master \ dump"); goto err; } thd->proc_info = "Waiiting to reconnect after a failed dump request"; - if(mysql->net.vio) - vio_close(mysql->net.vio); + mc_end_server(mysql); // first time retry immediately, assuming that we can recover // right away - if first time fails, sleep between re-tries // hopefuly the admin can fix the problem sometime - if(retried_once) - safe_sleep(thd, glob_mi.connect_retry); + if (retried_once) + safe_sleep(thd, mi, mi->connect_retry); else retried_once = 1; - if(slave_killed(thd)) + if (slave_killed(thd,mi)) { - sql_print_error("Slave thread killed while retrying master \ + sql_print_error("Slave I/O thread killed while retrying master \ dump"); goto err; } thd->proc_info = "Reconnecting after a failed dump request"; - last_failed_pos=glob_mi.pos; - sql_print_error("Slave: failed dump request, reconnecting to \ -try again, log '%s' at postion %s", RPL_LOG_NAME, - llstr(last_failed_pos,llbuff)); - if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd)) + sql_print_error("Slave I/O thread: failed dump request, \ +reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); + if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi)) { - sql_print_error("Slave thread killed during or after reconnect"); + sql_print_error("Slave I/O thread killed during or \ +after reconnect"); goto err; } goto connected; } - - while(!slave_killed(thd)) + while (!slave_killed(thd,mi)) { thd->proc_info = "Reading master update"; - ulong event_len = read_event(mysql, &glob_mi); - if(slave_killed(thd)) + ulong event_len = read_event(mysql, mi); + if (slave_killed(thd,mi)) { - sql_print_error("Slave thread killed while reading event"); + sql_print_error("Slave I/O thread killed while reading event"); goto err; } - if (event_len == packet_error) { - if(mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE) + if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE) { sql_print_error("Log entry on master is longer than \ max_allowed_packet on slave. Slave thread will be aborted. If the entry is \ @@ -1214,99 +1695,72 @@ max_allowed_packet. The current value is %ld", max_allowed_packet); } thd->proc_info = "Waiting to reconnect after a failed read"; - if(mysql->net.vio) - vio_close(mysql->net.vio); - if(retried_once) // punish repeat offender with sleep - safe_sleep(thd, glob_mi.connect_retry); + mc_end_server(mysql); + if (retried_once) // punish repeat offender with sleep + safe_sleep(thd,mi,mi->connect_retry); else retried_once = 1; - if(slave_killed(thd)) + if (slave_killed(thd,mi)) { - sql_print_error("Slave thread killed while waiting to \ + sql_print_error("Slave I/O thread killed while waiting to \ reconnect after a failed read"); goto err; } thd->proc_info = "Reconnecting after a failed read"; - last_failed_pos= glob_mi.pos; - sql_print_error("Slave: Failed reading log event, \ -reconnecting to retry, log '%s' position %s", RPL_LOG_NAME, - llstr(last_failed_pos, llbuff)); - if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd)) + sql_print_error("Slave I/O thread: Failed reading log event, \ +reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME, + llstr(mi->master_log_pos, llbuff)); + if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi)) { - sql_print_error("Slave thread killed during or after a \ + sql_print_error("Slave I/O thread killed during or after a \ reconnect done to recover from failed read"); goto err; } - goto connected; } // if(event_len == packet_error) - thd->proc_info = "Processing master log event"; - if(exec_event(thd, &mysql->net, &glob_mi, event_len)) - { - sql_print_error("\ -Error running query, slave aborted. Fix the problem, and re-start \ -the slave thread with \"mysqladmin start-slave\". We stopped at log \ -'%s' position %s", - RPL_LOG_NAME, llstr(glob_mi.pos, llbuff)); - goto err; - // there was an error running the query - // abort the slave thread, when the problem is fixed, the user - // should restart the slave with mysqladmin start-slave - } + thd->proc_info = "Queueing event from master"; + if (queue_event(mi,(const char*)mysql->net.read_pos + 1, + (uint)event_len)) + { + sql_print_error("Slave I/O thread could not queue event \ +from master"); + goto err; + } + // TODO: check debugging abort code #ifndef DBUG_OFF - if(abort_slave_event_count && !--events_till_abort) + if (abort_slave_event_count && !--events_till_abort) { - sql_print_error("Slave: debugging abort"); + sql_print_error("Slave I/O thread: debugging abort"); goto err; } #endif - - // successful exec with offset advance, - // the slave repents and his sins are forgiven! - if(glob_mi.pos > last_failed_pos) - { - retried_once = 0; -#ifndef DBUG_OFF - stuck_count = 0; -#endif - } -#ifndef DBUG_OFF - else - { - // show a little mercy, allow slave to read one more event - // before cutting him off - otherwise he gets stuck - // on Intvar events, since they do not advance the offset - // immediately - if (++stuck_count > 2) - events_till_disconnect++; - } -#endif - } // while(!slave_killed(thd)) - read/exec loop - } // while(!slave_killed(thd)) - slave loop + } // while(!slave_killed(thd,mi)) - read/exec loop + } // while(!slave_killed(thd,mi)) - slave loop // error = 0; err: - // print the current replication position - sql_print_error("Slave thread exiting, replication stopped in log '%s' at \ -position %s", - RPL_LOG_NAME, llstr(glob_mi.pos,llbuff)); + // print the current replication position + sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); thd->query = thd->db = 0; // extra safety if(mysql) mc_mysql_close(mysql); thd->proc_info = "Waiting for slave mutex on exit"; - pthread_mutex_lock(&LOCK_slave); - slave_running = 0; + pthread_mutex_lock(&mi->run_lock); + mi->slave_running = 0; + mi->io_thd = 0; + // TODO: make rpl_status part of MASTER_INFO change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); - abort_slave = 0; - save_temporary_tables = thd->temporary_tables; - thd->temporary_tables = 0; // remove tempation from destructor to close them - pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done - pthread_mutex_unlock(&LOCK_slave); + mi->abort_slave = 0; // TODO: check if this is needed + DBUG_ASSERT(thd->net.buff != 0); net_end(&thd->net); // destructor will not free it, because we are weird - slave_thd = 0; + pthread_mutex_lock(&LOCK_thread_count); delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done + pthread_mutex_unlock(&mi->run_lock); my_thread_end(); #ifndef DBUG_OFF if(abort_slave_event_count && !events_till_abort) @@ -1316,9 +1770,185 @@ position %s", DBUG_RETURN(0); // Can't return anything here } +/* slave SQL logic thread */ -/* try to connect until successful or slave killed */ +pthread_handler_decl(handle_slave_sql,arg) +{ +#ifndef DBUG_OFF + slave_begin: +#endif + THD *thd; /* needs to be first for thread_stack */ + MYSQL *mysql = NULL ; + bool retried_once = 0; + ulonglong last_failed_pos = 0; // TODO: see if this can be removed + char llbuff[22],llbuff1[22]; + RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; + const char* errmsg=0; + DBUG_ASSERT(rli->inited); + pthread_mutex_lock(&rli->run_lock); + DBUG_ASSERT(!rli->slave_running); +#ifndef DBUG_OFF + rli->events_till_abort = abort_slave_event_count; +#endif + + + // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff + my_thread_init(); + thd = new THD; // note that contructor of THD uses DBUG_ ! + DBUG_ENTER("handle_slave_sql"); + + pthread_detach_this_thread(); + if (init_slave_thread(thd, SLAVE_THD_SQL)) + { + // TODO: this is currently broken - slave start and change master + // will be stuck if we fail here + pthread_cond_broadcast(&rli->start_cond); + pthread_mutex_unlock(&rli->run_lock); + sql_print_error("Failed during slave thread initialization"); + goto err; + } + thd->thread_stack = (char*)&thd; // remember where our stack is + thd->temporary_tables = rli->save_temporary_tables; // restore temp tables + threads.append(thd); + rli->sql_thd = thd; + rli->slave_running = 1; + rli->abort_slave = 0; + pthread_cond_broadcast(&rli->start_cond); + pthread_mutex_unlock(&rli->run_lock); + rli->pending = 0; //this should always be set to 0 when the slave thread + // is started + if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg)) + { + sql_print_error("Error initializing relay log position: %s", + errmsg); + goto err; + } + DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + + DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", + rli->master_log_name, llstr(rli->master_log_pos,llbuff))); + DBUG_ASSERT(rli->sql_thd == thd); + sql_print_error("Slave SQL thread initialized, starting replication in \ +log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME, + llstr(rli->master_log_pos,llbuff),rli->relay_log_name, + llstr(rli->relay_log_pos,llbuff1)); + while (!slave_killed(thd,rli)) + { + thd->proc_info = "Processing master log event"; + DBUG_ASSERT(rli->sql_thd == thd); + if (exec_relay_log_event(thd,rli)) + { + // do not scare the user if SQL thread was simply killed or stopped + if (!slave_killed(thd,rli)) + sql_print_error("\ +Error running query, slave SQL thread aborted. Fix the problem, and restart \ +the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \ +'%s' position %s", + RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); + goto err; + } + } // while(!slave_killed(thd,rli)) - read/exec loop + + // error = 0; + err: + // print the current replication position + sql_print_error("Slave SQL thread exiting, replication stopped in log \ + '%s' at position %s", + RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff)); + thd->query = thd->db = 0; // extra safety + thd->proc_info = "Waiting for slave mutex on exit"; + pthread_mutex_lock(&rli->run_lock); + DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun + rli->slave_running = 0; + rli->save_temporary_tables = thd->temporary_tables; + //TODO: see if we can do this conditionally in next_event() instead + // to avoid unneeded position re-init + rli->log_pos_current=0; + thd->temporary_tables = 0; // remove tempation from destructor to close them + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); // destructor will not free it, because we are weird + DBUG_ASSERT(rli->sql_thd == thd); + rli->sql_thd = 0; + pthread_mutex_lock(&LOCK_thread_count); + delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + pthread_cond_broadcast(&rli->stop_cond); + // tell the world we are done + pthread_mutex_unlock(&rli->run_lock); + my_thread_end(); +#ifndef DBUG_OFF // TODO: reconsider the code below + if (abort_slave_event_count && !rli->events_till_abort) + goto slave_begin; +#endif + pthread_exit(0); + DBUG_RETURN(0); // Can't return anything here +} +int queue_event(MASTER_INFO* mi,const char* buf,uint event_len) +{ + int error; + bool inc_pos = 1; + if (mi->old_format) + return 1; // TODO: deal with old format + + switch (buf[EVENT_TYPE_OFFSET]) + { + case ROTATE_EVENT: + { + Rotate_log_event rev(buf,event_len,0); + if (!rev.is_valid()) + return 1; + DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name)); + memcpy(mi->master_log_name,rev.new_log_ident, + rev.ident_len); + mi->master_log_name[rev.ident_len] = 0; + mi->master_log_pos = rev.pos; + inc_pos = 0; +#ifndef DBUG_OFF + /* if we do not do this, we will be getting the first + rotate event forever, so + we need to not disconnect after one + */ + if (disconnect_slave_event_count) + events_till_disconnect++; +#endif + break; + } + default: + break; + } + + if (!(error = mi->rli.relay_log.appendv(buf,event_len,0))) + { + if (inc_pos) + mi->master_log_pos += event_len; + } + return error; +} + +void end_relay_log_info(RELAY_LOG_INFO* rli) +{ + if (!rli->inited) + return; + if (rli->info_fd >= 0) + { + end_io_cache(&rli->info_file); + (void)my_close(rli->info_fd, MYF(MY_WME)); + rli->info_fd = -1; + } + if (rli->cur_log_fd >= 0) + { + end_io_cache(&rli->cache_buf); + (void)my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + } + rli->inited = 0; + rli->log_pos_current=0; + rli->relay_log.close(1); +} + +/* try to connect until successful or slave killed */ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) { return connect_to_master(thd, mysql, mi, 0); @@ -1328,7 +1958,6 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) Try to connect until successful or slave killed or we have retried master_retry_count times */ - static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, bool reconnect) { @@ -1337,15 +1966,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, ulong err_count=0; char llbuff[22]; - /* - If we lost connection after reading a state set event - we will be re-reading it, so pending needs to be cleared - */ - mi->pending = 0; #ifndef DBUG_OFF events_till_disconnect = disconnect_slave_event_count; #endif - while (!(slave_was_killed = slave_killed(thd)) && + while (!(slave_was_killed = slave_killed(thd,mi)) && (reconnect ? mc_mysql_reconnect(mysql) : !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, mi->port, 0, 0))) @@ -1353,12 +1977,13 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, /* Don't repeat last error */ if (mc_mysql_errno(mysql) != last_errno) { - sql_print_error("Slave thread: error connecting to master: \ -%s, last_errno=%d, retry in %d sec", + sql_print_error("Slave I/O thread: error connecting to master \ +'%s@%s:%d': \ +%s, last_errno=%d, retry in %d sec",mi->user,mi->host,mi->port, mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql), mi->connect_retry); } - safe_sleep(thd, mi->connect_retry); + safe_sleep(thd,mi,mi->connect_retry); /* by default we try forever. The reason is that failure will trigger master election, so if the user did not set master_retry_count we do not want to have electioin triggered on the first failure to @@ -1377,10 +2002,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, { if (reconnect) sql_print_error("Slave: connected to master '%s@%s:%d',\ -replication resumed in log '%s' at position %s", glob_mi.user, - glob_mi.host, glob_mi.port, - RPL_LOG_NAME, - llstr(glob_mi.pos,llbuff)); +replication resumed in log '%s' at position %s", mi->user, + mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); else { change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE); @@ -1405,6 +2030,175 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) return connect_to_master(thd, mysql, mi, 1); } +int flush_relay_log_info(RELAY_LOG_INFO* rli) +{ + IO_CACHE* file = &rli->info_file; + char lbuf[22],lbuf1[22]; + + my_b_seek(file, 0L); + my_b_printf(file, "%s\n%s\n%s\n%s\n", + rli->relay_log_name, llstr(rli->relay_log_pos, lbuf), + rli->master_log_name, llstr(rli->master_log_pos, lbuf1) + ); + flush_io_cache(file); + flush_io_cache(rli->cur_log); + return 0; +} + +IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg) +{ + DBUG_ASSERT(rli->cur_log != &rli->cache_buf); + IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf; + DBUG_ASSERT(rli->cur_log_fd == -1); + if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name, + errmsg))<0) + return 0; + my_b_seek(cur_log,rli->relay_log_pos); + return cur_log; +} + +Log_event* next_event(RELAY_LOG_INFO* rli) +{ + Log_event* ev; + IO_CACHE* cur_log = rli->cur_log; + pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); + const char* errmsg=0; + THD* thd = rli->sql_thd; + bool was_killed; + DBUG_ASSERT(thd != 0); + + // For most operations we need to protect rli members with data_lock, + // so we will hold it for the most of the loop below + // However, we will release it whenever it is worth the hassle, + // and in the cases when we go into a pthread_cond_wait() with the + // non-data_lock mutex + pthread_mutex_lock(&rli->data_lock); + + for (;!(was_killed=slave_killed(thd,rli));) + { + // we can have two kinds of log reading: + // hot_log - rli->cur_log points at the IO_CACHE of relay_log, which + // is actively being updated by the I/O thread. We need to be careful + // in this case and make sure that we are not looking at a stale log that + // has already been rotated. If it has been, we reopen the log + // the other case is much simpler - we just have a read only log that + // nobody else will be updating. + bool hot_log; + if ((hot_log = (cur_log != &rli->cache_buf))) + { + DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor + pthread_mutex_lock(log_lock); + // reading cur_log->init_count here is safe because the log will only + // be rotated when we hold relay_log.LOCK_log + if (cur_log->init_count != rli->cur_log_init_count) + { + if (!(cur_log=reopen_relay_log(rli,&errmsg))) + { + pthread_mutex_unlock(log_lock); + goto err; + } + pthread_mutex_unlock(log_lock); + hot_log=0; + } + } + DBUG_ASSERT(my_b_tell(cur_log) >= 4); + DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); + if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format))) + { + DBUG_ASSERT(thd==rli->sql_thd); + if (hot_log) + pthread_mutex_unlock(log_lock); + pthread_mutex_unlock(&rli->data_lock); + return ev; + } + DBUG_ASSERT(thd==rli->sql_thd); + if (!cur_log->error) /* EOF */ + { + // on a hot log, EOF means that there are no more updates to + // process and we must block until I/O thread adds some and + // signals us to continue + if (hot_log) + { + DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count); + //we can, and should release data_lock while we are waiting for + // update. If we do not, show slave status will block + pthread_mutex_unlock(&rli->data_lock); + + // IMPORTANT: note that wait_for_update will unlock LOCK_log, but + // expects the caller to lock it + rli->relay_log.wait_for_update(rli->sql_thd); + + // re-acquire data lock since we released it earlier + pthread_mutex_lock(&rli->data_lock); + continue; + } + // if the log was not hot, we need to move to the next log in + // sequence. The next log could be hot or cold, we deal with both + // cases separately after doing some common initialization + else + { + end_io_cache(cur_log); + DBUG_ASSERT(rli->cur_log_fd >= 0); + my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + int error; + + // purge_first_log will properly set up relay log coordinates in rli + if (rli->relay_log.purge_first_log(rli)) + { + errmsg = "Error purging processed log"; + goto err; + } + + // next log is hot + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { +#ifdef EXTRA_DEBUG + sql_print_error("next log '%s' is currently active", + rli->linfo.log_file_name); +#endif + rli->cur_log = cur_log = rli->relay_log.get_log_file(); + rli->cur_log_init_count = cur_log->init_count; + DBUG_ASSERT(rli->cur_log_fd == -1); + + // read pointer has to be at the start since we are the only + // reader + if (check_binlog_magic(cur_log,&errmsg)) + goto err; + continue; + } + // if we get here, the log was not hot, so we will have to + // open it ourselves +#ifdef EXTRA_DEBUG + sql_print_error("next log '%s' is not active", + rli->linfo.log_file_name); +#endif + // open_binlog() will check the magic header + if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, + &errmsg))<0) + goto err; + } + } + else // read failed with a non-EOF error + { + // TODO: come up with something better to handle this error + sql_print_error("Slave SQL thread: I/O error reading \ +event(errno=%d,cur_log->error=%d)", + my_errno,cur_log->error); + // no need to hog the mutex while we sleep + pthread_mutex_unlock(&rli->data_lock); + safe_sleep(rli->sql_thd,rli->mi,1); + pthread_mutex_lock(&rli->data_lock); + } + } + if (!errmsg && was_killed) + errmsg = "slave SQL thread was killed"; +err: + pthread_mutex_unlock(&rli->data_lock); + sql_print_error("Error reading relay log event: %s", errmsg); + return 0; +} + #ifdef __GNUC__ template class I_List_iterator<i_string>; diff --git a/sql/slave.h b/sql/slave.h index a5bc4d61309..9ad5c75a556 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -2,67 +2,265 @@ #define SLAVE_H #include "mysql.h" +#include "my_list.h" #define SLAVE_NET_TIMEOUT 3600 +#define MAX_SLAVE_ERRMSG 1024 + +/* + The replication is accomplished by starting two threads - I/O + thread, and SQL thread. I/O thread is associated with its + MASTER_INFO struct, so MASTER_INFO can be viewed as I/O thread + descriptor. SQL thread is associated with RELAY_LOG_INFO struct. + + I/O thread reads maintains a connection to the master, and reads log + events from the master as they arrive, queueing them by writing them + out into the temporary slave binary log (relay log). The SQL thread, + in turn, reads the slave binary log executing each event. + + Relay log is needed to be able to handle situations when there is a large + backlog of unprocessed events from the master (eg. one particular update + takes a day to finish), and to be able to restart the slave server without + having to re-read the master updates. + */ extern ulong slave_net_timeout, master_retry_count; extern char* slave_load_tmpdir; +extern my_string master_info_file,relay_log_info_file; +extern my_string opt_relay_logname, opt_relaylog_index_name; +extern bool opt_skip_slave_start; +struct st_master_info; + +#define LOCK_ACTIVE_MI { pthread_mutex_lock(&LOCK_active_mi); \ + ++active_mi_in_use; \ + pthread_mutex_unlock(&LOCK_active_mi);} + +#define UNLOCK_ACTIVE_MI { pthread_mutex_lock(&LOCK_active_mi); \ + --active_mi_in_use; \ + pthread_mutex_unlock(&LOCK_active_mi); } + +/* + st_relay_log_info contains information on the current relay log and + relay log offset, and master log name and log sequence corresponding to the + last update. Additionally, misc information specific to the SQL thread is + included. + + st_relay_log_info is initialized from the slave.info file if such exists. + Otherwise, data members are intialized with defaults. The initialization is + done with init_relay_log_info() call. + + The format of slave.info file: + + relay_log_name + relay_log_pos + master_log_name + master_log_pos + + To clean up, call end_relay_log_info() + */ +typedef struct st_relay_log_info +{ + // info_fd - file descriptor of the info file. set only during + // initialization or clean up - safe to read anytime + // cur_log_fd - file descriptor of the current read relay log, protected by + // data_lock + File info_fd,cur_log_fd; + + // IO_CACHE of the info file - set only during init or end, safe to read + // anytime + IO_CACHE info_file; + + // name of current read relay log - protected by data_lock + char relay_log_name[FN_REFLEN]; + + // master log name corresponding to current read position - protected by + // data lock + char master_log_name[FN_REFLEN]; + + // original log position of last processed event - protected by data_lock + volatile uint32 master_log_pos; + // when we restart slave thread we need to have access to the previously + // created temporary tables. Modified only on init/end and by the SQL + // thread, read only by SQL thread, need no mutex + TABLE* save_temporary_tables; + + // relay_log_pos - current offset in the relay log - protected by data_lock + // pending - in some cases we do not increment offset immediately after + // processing an event, because the following event needs to be processed + // atomically together with this one ( so far, there is only one type of + // such event - Intvar_event that sets auto_increment value). However, once + // both events have been processed, we need to increment by the cumulative + // offset. pending stored the extra offset to be added to the position. + ulonglong relay_log_pos,pending; + + // standard lock acquistion order to avoid deadlocks: + // run_lock, data_lock, relay_log.LOCK_log,relay_log.LOCK_index + pthread_mutex_t data_lock,run_lock; + + // start_cond is broadcast when SQL thread is started + // stop_cond - when stopped + // data_cond - when data protected by data_lock changes + pthread_cond_t start_cond,stop_cond,data_cond; + + // if not set, the value of other members of the structure are undefined + bool inited; + + // parent master info structure + struct st_master_info *mi; + + // protected with internal locks + // must get data_lock when resetting the logs + MYSQL_LOG relay_log; + LOG_INFO linfo; + IO_CACHE cache_buf,*cur_log; + + /* needed to deal properly with cur_log getting closed and re-opened with + a different log under our feet + */ + int cur_log_init_count; + + volatile bool abort_slave, slave_running; +// needed for problems when slave stops and +// we want to restart it skipping one or more events in the master log that +// have caused errors, and have been manually applied by DBA already + volatile uint32 slave_skip_counter; +#ifndef DBUG_OFF + int events_till_abort; +#endif + int last_slave_errno; + char last_slave_error[MAX_SLAVE_ERRMSG]; + THD* sql_thd; + bool log_pos_current; + + st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0), + cur_log_init_count(0), + log_pos_current(0) + { + relay_log_name[0] = master_log_name[0] = 0; + pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); + pthread_cond_init(&data_cond, NULL); + pthread_cond_init(&start_cond, NULL); + pthread_cond_init(&stop_cond, NULL); + } + ~st_relay_log_info() + { + pthread_mutex_destroy(&run_lock); + pthread_mutex_destroy(&data_lock); + pthread_cond_destroy(&data_cond); + pthread_cond_destroy(&start_cond); + pthread_cond_destroy(&stop_cond); + } + inline void inc_pending(ulonglong val) + { + pending += val; + } + // TODO: this probably needs to be fixed + inline void inc_pos(ulonglong val, uint32 log_pos, bool skip_lock=0) + { + if (!skip_lock) + pthread_mutex_lock(&data_lock); + relay_log_pos += val+pending; + pending = 0; + if (log_pos) + master_log_pos = log_pos+val; + pthread_cond_broadcast(&data_cond); + if (!skip_lock) + pthread_mutex_unlock(&data_lock); + } + // thread safe read of position - not needed if we are in the slave thread, + // but required otherwise + inline void read_pos(ulonglong& var) + { + pthread_mutex_lock(&data_lock); + var = relay_log_pos; + pthread_mutex_unlock(&data_lock); + } + + int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos); +} RELAY_LOG_INFO; + +// repopen_relay_log() is called when we notice that the current "hot" log +// got rotated under our feet +IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg); +Log_event* next_event(RELAY_LOG_INFO* rli); + +/* st_master_info contains information about how to connect to a master, + current master log name, and current log offset, as well as misc + control variables + + st_master_info is initialized once from the master.info file if such + exists. Otherwise, data members corresponding to master.info fields are + initialized with defaults specified by master-* options. The initialization + is done through init_master_info() call. + + The format of master.info file: + + log_name + log_pos + master_host + master_user + master_pass + master_port + master_connect_retry + + To write out the contents of master.info file to disk ( needed every + time we read and queue data from the master ), a call to + flush_master_info() is required. + + To clean up, call end_master_info() + +*/ + typedef struct st_master_info { - char log_file_name[FN_REFLEN]; - ulonglong pos,pending; - File fd; // we keep the file open, so we need to remember the file pointer + char master_log_name[FN_REFLEN]; + + ulonglong master_log_pos; + File fd; IO_CACHE file; + // the variables below are needed because we can change masters on the fly char host[HOSTNAME_LENGTH+1]; char user[USERNAME_LENGTH+1]; char password[HASH_PASSWORD_LENGTH+1]; uint port; uint connect_retry; - uint32 last_log_seq; // log sequence number of last processed event - pthread_mutex_t lock; - pthread_cond_t cond; + pthread_mutex_t data_lock,run_lock; + pthread_cond_t data_cond,start_cond,stop_cond; bool inited; bool old_format; /* master binlog is in 3.23 format */ + RELAY_LOG_INFO rli; +#ifndef DBUG_OFF + int events_till_abort; +#endif + volatile bool abort_slave, slave_running; + THD* io_thd; - st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0), - old_format(0) + st_master_info():fd(-1),inited(0), + old_format(0),io_thd(0) { host[0] = 0; user[0] = 0; password[0] = 0; - pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST); - pthread_cond_init(&cond, NULL); + pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); + pthread_cond_init(&data_cond, NULL); + pthread_cond_init(&start_cond, NULL); + pthread_cond_init(&stop_cond, NULL); } ~st_master_info() { - pthread_mutex_destroy(&lock); - pthread_cond_destroy(&cond); - } - inline void inc_pending(ulonglong val) - { - pending += val; - } - inline void inc_pos(ulonglong val, uint32 log_seq) - { - pthread_mutex_lock(&lock); - pos += val + pending; - pending = 0; - last_log_seq = log_seq; - pthread_cond_broadcast(&cond); - pthread_mutex_unlock(&lock); - } - // thread safe read of position - not needed if we are in the slave thread, - // but required otherwise - inline void read_pos(ulonglong& var) - { - pthread_mutex_lock(&lock); - var = pos; - pthread_mutex_unlock(&lock); + pthread_mutex_destroy(&run_lock); + pthread_mutex_destroy(&data_lock); + pthread_cond_destroy(&data_cond); + pthread_cond_destroy(&start_cond); + pthread_cond_destroy(&stop_cond); } - int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos); } MASTER_INFO; +int queue_event(MASTER_INFO* mi,const char* buf,uint event_len); + typedef struct st_table_rule_ent { char* db; @@ -74,22 +272,52 @@ typedef struct st_table_rule_ent #define TABLE_RULE_ARR_SIZE 16 #define MAX_SLAVE_ERRMSG 1024 -#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\ +#define RPL_LOG_NAME (rli->master_log_name[0] ? rli->master_log_name :\ + "FIRST") +#define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\ "FIRST") +/* masks for start/stop operations on io and sql slave threads */ +#define SLAVE_IO 1 +#define SLAVE_SQL 2 +#define SLAVE_FORCE_ALL 4 /* if this is set, if first gives an + error, second will be tried. Otherwise, + if first fails, we fail + */ +int init_slave(); int flush_master_info(MASTER_INFO* mi); +int flush_relay_log_info(RELAY_LOG_INFO* rli); int register_slave_on_master(MYSQL* mysql); +int terminate_slave_threads(MASTER_INFO* mi, int thread_mask, + bool skip_lock = 0); +int terminate_slave_thread(THD* thd, pthread_mutex_t* term_mutex, + pthread_mutex_t* cond_lock, + pthread_cond_t* term_cond, + volatile bool* slave_running); +int start_slave_threads(bool need_slave_mutex, bool wait_for_start, + MASTER_INFO* mi, const char* master_info_fname, + const char* slave_info_fname, int thread_mask); +/* cond_lock is usually same as start_lock. It is needed for the case when + start_lock is 0 which happens if start_slave_thread() is called already + inside the start_lock section, but at the same time we want a + pthread_cond_wait() on start_cond,start_lock +*/ +int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock, + pthread_mutex_t *cond_lock, + pthread_cond_t* start_cond, + volatile bool* slave_running, + MASTER_INFO* mi); int mysql_table_dump(THD* thd, const char* db, const char* tbl_name, int fd = -1); // if fd is -1, dump to NET -int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, +int fetch_master_table(THD* thd, const char* db_name, const char* table_name, MASTER_INFO* mi, MYSQL* mysql); // retrieve non-exitent table from master -int show_master_info(THD* thd); +int show_master_info(THD* thd, MASTER_INFO* mi); int show_binlog_info(THD* thd); int tables_ok(THD* thd, TABLE_LIST* tables); @@ -105,29 +333,31 @@ int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec); void init_table_rule_hash(HASH* h, bool* h_inited); void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited); char* rewrite_db(char* db); -int check_expected_error(THD* thd, int error_code); +int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int error_code); void skip_load_data_infile(NET* net); -void slave_print_error(int err_code, const char* msg, ...); +void slave_print_error(RELAY_LOG_INFO* rli,int err_code, const char* msg, ...); void end_slave(); // clean up -int init_master_info(MASTER_INFO* mi); +int init_master_info(MASTER_INFO* mi, const char* master_info_fname, + const char* slave_info_fname); void end_master_info(MASTER_INFO* mi); -extern bool opt_log_slave_updates ; -pthread_handler_decl(handle_slave,arg); -extern bool volatile abort_loop, abort_slave, slave_running; -extern uint32 slave_skip_counter; -// needed for problems when slave stops and -// we want to restart it skipping one or more events in the master log that -// have caused errors, and have been manually applied by DBA already +int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname); +void end_relay_log_info(RELAY_LOG_INFO* rli); +void lock_slave_threads(MASTER_INFO* mi); +void unlock_slave_threads(MASTER_INFO* mi); +void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse); +int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos, + bool need_data_lock, const char** errmsg); -extern int last_slave_errno; -#ifndef DBUG_OFF -extern int events_till_abort; -#endif -extern char last_slave_error[MAX_SLAVE_ERRMSG]; -extern pthread_t slave_real_id; -extern THD* slave_thd; -extern MASTER_INFO glob_mi; +int purge_relay_logs(RELAY_LOG_INFO* rli,bool just_reset,const char** errmsg); + +extern bool opt_log_slave_updates ; +pthread_handler_decl(handle_slave_io,arg); +pthread_handler_decl(handle_slave_sql,arg); +extern bool volatile abort_loop; +extern MASTER_INFO main_mi, *active_mi; // active_mi for multi-master +extern volatile int active_mi_in_use; +extern LIST master_list; extern HASH replicate_do_table, replicate_ignore_table; extern DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table; extern bool do_table_inited, ignore_table_inited, @@ -141,7 +371,8 @@ extern int disconnect_slave_event_count, abort_slave_event_count ; // the master variables are defaults read from my.cnf or command line extern uint master_port, master_connect_retry, report_port; extern my_string master_user, master_password, master_host, - master_info_file, report_user, report_host, report_password; + master_info_file, relay_log_info_file, report_user, report_host, + report_password; extern I_List<i_string> replicate_do_db, replicate_ignore_db; extern I_List<i_string_pair> replicate_rewrite_db; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index d76461b6faf..44a1dbc1720 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -98,7 +98,6 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), current_linfo = 0; slave_thread = 0; slave_proxy_id = 0; - log_seq = 0; file_id = 0; cond_count=0; convert_set=0; @@ -119,6 +118,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), where="field list"; server_id = ::server_id; slave_net = 0; + log_pos = 0; server_status=SERVER_STATUS_AUTOCOMMIT; update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; options=thd_startup_options; @@ -216,10 +216,11 @@ THD::~THD() DBUG_VOID_RETURN; } -void THD::prepare_to_die() +void THD::awake(bool prepare_to_die) { + if (prepare_to_die) + killed = 1; thr_alarm_kill(real_id); - killed = 1; #ifdef SIGNAL_WITH_VIO_CLOSE close_active_vio(); #endif @@ -228,6 +229,10 @@ void THD::prepare_to_die() pthread_mutex_lock(&mysys_var->mutex); if (!system_thread) // Don't abort locks mysys_var->abort=1; + // this broadcast could be up in the air if the victim thread + // exits the cond in the time between read and broadcast, but that is + // ok since all we want to do is to make the victim thread get out + // of waiting on current_cond if (mysys_var->current_cond) { pthread_mutex_lock(mysys_var->current_mutex); diff --git a/sql/sql_class.h b/sql/sql_class.h index 698a90c1a28..87d19381b78 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -21,6 +21,8 @@ #pragma interface /* gcc class implementation */ #endif +// TODO: create log.h and move all the log header stuff there + class Query_log_event; class Load_log_event; class Slave_log_event; @@ -40,6 +42,8 @@ enum enum_log_type { LOG_CLOSED, LOG_NORMAL, LOG_NEW, LOG_BIN }; #define LOG_INFO_FATAL -7 #define LOG_INFO_IN_USE -8 +struct st_relay_log_info; + typedef struct st_log_info { char log_file_name[FN_REFLEN]; @@ -64,8 +68,6 @@ class MYSQL_LOG { char time_buff[20],db[NAME_LEN+1]; char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN]; bool write_error,inited; - uint32 log_seq; // current event sequence number - // needed this for binlog uint file_id; // current file sequence number for load data infile // binary logging bool no_rotate; // for binlog - if log name can never change @@ -74,36 +76,52 @@ class MYSQL_LOG { // purging enum cache_type io_cache_type; bool need_start_event; + pthread_cond_t update_cond; + bool no_auto_events; // for relay binlog friend class Log_event; public: MYSQL_LOG(); ~MYSQL_LOG(); pthread_mutex_t* get_log_lock() { return &LOCK_log; } + IO_CACHE* get_log_file() { return &log_file; } + void signal_update() { pthread_cond_broadcast(&update_cond);} + void wait_for_update(THD* thd); void set_need_start_event() { need_start_event = 1; } void set_index_file_name(const char* index_file_name = 0); void init(enum_log_type log_type_arg, - enum cache_type io_cache_type_arg = WRITE_CACHE); + enum cache_type io_cache_type_arg = WRITE_CACHE, + bool no_auto_events_arg = 0); void open(const char *log_name,enum_log_type log_type, - const char *new_name=0); + const char *new_name, enum cache_type io_cache_type_arg, + bool no_auto_events_arg); void new_file(bool inside_mutex = 0); bool open_index(int options); void close_index(); - bool write(THD *thd, enum enum_server_command command,const char *format,...); + bool write(THD *thd, enum enum_server_command command, + const char *format,...); bool write(THD *thd, const char *query, uint query_length, time_t query_start=0); bool write(Log_event* event_info); // binary log write bool write(IO_CACHE *cache); + + //v stands for vector + //invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0) + bool appendv(const char* buf,uint len,...); + int generate_new_name(char *new_name,const char *old_name); void make_log_name(char* buf, const char* log_ident); bool is_active(const char* log_file_name); int purge_logs(THD* thd, const char* to_log); + int purge_first_log(struct st_relay_log_info* rli); + int reset_logs(THD* thd); void close(bool exiting = 0); // if we are exiting, we also want to close the // index file // iterating through the log index file - int find_first_log(LOG_INFO* linfo, const char* log_name); - int find_next_log(LOG_INFO* linfo); + int find_first_log(LOG_INFO* linfo, const char* log_name, + bool need_mutex=1); + int find_next_log(LOG_INFO* linfo, bool need_mutex=1); int get_current_log(LOG_INFO* linfo); uint next_file_id(); @@ -226,33 +244,72 @@ public: }; -/**************************************************************************** -** every connection is handled by a thread with a THD -****************************************************************************/ - class delayed_insert; +/* For each client connection we create a separate thread with THD serving as + a thread/connection descriptor */ + class THD :public ilink { public: - NET net; - LEX lex; - MEM_ROOT mem_root; - HASH user_vars; - String packet; /* Room for 1 row */ - struct sockaddr_in remote; - struct rand_struct rand; + NET net; // client connection descriptor + LEX lex; // parse tree descriptor + MEM_ROOT mem_root; // memory allocation pool + HASH user_vars; // hash for user variables + String packet; // dynamic string buffer used for network I/O + struct sockaddr_in remote; // client socket address + struct rand_struct rand; // used for authentication + + /* query points to the current query, + thread_stack is a pointer to the stack frame of handle_one_connection(), + which is called first in the thread for handling a client + */ char *query,*thread_stack; + /* + host - host of the client + user - user of the client, set to NULL until the user has been read from + the connection + priv_user - not sure why we have it, but it is set to "boot" when we run + with --bootstrap + db - currently selected database + ip - client IP + */ + char *host,*user,*priv_user,*db,*ip; + /* proc_info points to a string that will show in the Info column of + SHOW PROCESSLIST output + host_or_ip points to host if host is available, otherwise points to ip + */ const char *proc_info, *host_or_ip; + + /* + client_capabilities has flags describing what the client can do + sql_mode determines if certain non-standard SQL behaviour should be + enabled + max_packet_length - supposed to be maximum packet length the client + can handle, but it currently appears to be assigned but never used + except for one debugging statement + */ uint client_capabilities,sql_mode,max_packet_length; + + /* + master_access - privillege descriptor mask for system threads + db_access - privillege descriptor mask for regular threads + */ uint master_access,db_access; + + /* + open_tables - list of regular tables in use by this thread + temporary_tables - list of temp tables in use by this thread + handler_tables - list of tables that were opened with HANDLER OPEN + and are still in use by this thread + */ TABLE *open_tables,*temporary_tables, *handler_tables; + // TODO: document the variables below MYSQL_LOCK *lock,*locked_tables; ULL *ull; struct st_my_thread_var *mysys_var; enum enum_server_command command; uint32 server_id; - uint32 log_seq; uint32 file_id; // for LOAD DATA INFILE const char *where; time_t start_time,time_after_lock,user_time; @@ -293,15 +350,18 @@ public: bool system_thread,in_lock_tables,global_read_lock; bool query_error, bootstrap, cleanup_done; bool volatile killed; - LOG_INFO* current_linfo; + // if we do a purge of binary logs, log index info of the threads // that are currently reading it needs to be adjusted. To do that // each thread that is using LOG_INFO needs to adjust the pointer to it - - ulong slave_proxy_id; // in slave thread we need to know in behalf of which + LOG_INFO* current_linfo; + + // in slave thread we need to know in behalf of which // thread the query is being run to replicate temp tables properly - + ulong slave_proxy_id; + NET* slave_net; // network connection from slave to master + uint32 log_pos; THD(); ~THD(); @@ -331,7 +391,7 @@ public: pthread_mutex_unlock(&active_vio_lock); } #endif - void prepare_to_die(); + void awake(bool prepare_to_die); inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex, const char* msg) { diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 9f9fe6c79b3..a796eca58f9 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -94,7 +94,6 @@ typedef struct st_lex_master_info { char* host, *user, *password,*log_file_name; uint port, connect_retry; - ulong last_log_seq; ulonglong pos; ulong server_id; } LEX_MASTER_INFO; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 96185a174b5..a459ca0d602 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1220,14 +1220,18 @@ mysql_execute_command(void) { if (check_access(thd, PROCESS_ACL, any_db)) goto error; - res = change_master(thd); + LOCK_ACTIVE_MI; + res = change_master(thd,active_mi); + UNLOCK_ACTIVE_MI; break; } case SQLCOM_SHOW_SLAVE_STAT: { if (check_process_priv(thd)) goto error; - res = show_master_info(thd); + LOCK_ACTIVE_MI; + res = show_master_info(thd,active_mi); + UNLOCK_ACTIVE_MI; break; } case SQLCOM_SHOW_MASTER_STAT: @@ -1245,7 +1249,7 @@ mysql_execute_command(void) break; case SQLCOM_LOAD_MASTER_TABLE: - + { if (!tables->db) tables->db=thd->db; if (check_access(thd,CREATE_ACL,tables->db,&tables->grant.privilege)) @@ -1265,12 +1269,16 @@ mysql_execute_command(void) net_printf(&thd->net,ER_WRONG_TABLE_NAME,tables->name); break; } - - if (fetch_nx_table(thd, tables->db, tables->real_name, &glob_mi, 0)) - break; // fetch_nx_table did send the error to the client - send_ok(&thd->net); + LOCK_ACTIVE_MI; + // fetch_master_table will send the error to the client on failure + if (!fetch_master_table(thd, tables->db, tables->real_name, + active_mi, 0)) + { + send_ok(&thd->net); + } + UNLOCK_ACTIVE_MI; break; - + } case SQLCOM_CREATE_TABLE: if (!tables->db) tables->db=thd->db; @@ -1368,12 +1376,19 @@ mysql_execute_command(void) break; case SQLCOM_SLAVE_START: - start_slave(thd); + { + LOCK_ACTIVE_MI; + start_slave(thd,active_mi,1 /* net report*/); + UNLOCK_ACTIVE_MI; break; + } case SQLCOM_SLAVE_STOP: - stop_slave(thd); + { + LOCK_ACTIVE_MI; + stop_slave(thd,active_mi,1/* net report*/); + UNLOCK_ACTIVE_MI; break; - + } case SQLCOM_ALTER_TABLE: #if defined(DONT_ALLOW_SHOW_COMMANDS) send_error(&thd->net,ER_NOT_ALLOWED_COMMAND); /* purecov: inspected */ @@ -2967,6 +2982,7 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables) bool result=0; select_errors=0; /* Write if more errors */ + // TODO: figure out what's up with the commented out line below // mysql_log.flush(); // Flush log if (options & REFRESH_GRANT) { @@ -2998,10 +3014,15 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables) if (options & REFRESH_THREADS) flush_thread_cache(); if (options & REFRESH_MASTER) - reset_master(); + if (reset_master(thd)) + result=1; if (options & REFRESH_SLAVE) - reset_slave(); - + { + LOCK_ACTIVE_MI; + if (reset_slave(active_mi)) + result=1; + UNLOCK_ACTIVE_MI; + } return result; } @@ -3019,7 +3040,7 @@ void kill_one_thread(THD *thd, ulong id) if ((thd->master_access & PROCESS_ACL) || !strcmp(thd->user,tmp->user)) { - tmp->prepare_to_die(); + tmp->awake(1 /*prepare to die*/); error=0; } else diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 6c738ba36b4..146490c7b87 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -23,9 +23,9 @@ #include "mini_client.h" #include <thr_alarm.h> #include <my_dir.h> +#include <assert.h> extern const char* any_db; -extern pthread_handler_decl(handle_slave,arg); #ifndef DBUG_OFF int max_binlog_dump_events = 0; // unlimited @@ -33,6 +33,26 @@ bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif +int check_binlog_magic(IO_CACHE* log, const char** errmsg) +{ + char magic[4]; + DBUG_ASSERT(my_b_tell(log) == 0); + + if (my_b_read(log, (byte*) magic, sizeof(magic))) + { + *errmsg = "I/O error reading the header from the binary log"; + sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno, + log->error); + return 1; + } + if (memcmp(magic, BINLOG_MAGIC, sizeof(magic))) + { + *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL"; + return 1; + } + return 0; +} + static int fake_rotate_event(NET* net, String* packet, char* log_file_name, const char**errmsg) { @@ -46,7 +66,10 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, 0); - int4store(header + LOG_SEQ_OFFSET, 0); + + // TODO: check what problems this may cause and fix them + int4store(header + LOG_POS_OFFSET, 0); + packet->append(header, sizeof(header)); /* We need to split the next statement because of problem with cxx */ int4store(buf,4); // tell slave to skip magic number @@ -133,7 +156,6 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg) { File file; - char magic[4]; if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 || init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0, @@ -142,19 +164,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, *errmsg = "Could not open log file"; // This will not be sent goto err; } - - if (my_b_read(log, (byte*) magic, sizeof(magic))) - { - *errmsg = "I/O error reading the header from the binary log"; - sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno, - log->error); + if (check_binlog_magic(log,errmsg)) goto err; - } - if (memcmp(magic, BINLOG_MAGIC, sizeof(magic))) - { - *errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL"; - goto err; - } return file; err: @@ -366,7 +377,8 @@ impossible position"; packet->length(0); packet->append("\0",1); } - + // TODO: now that we are logging the offset, check to make sure + // the recorded offset and the actual match if (error != LOG_READ_EOF) { switch(error) { @@ -410,13 +422,6 @@ impossible position"; // to signal us { log.error=0; - - // tell the kill thread how to wake us up - thd->mysys_var->current_mutex = log_lock; - thd->mysys_var->current_cond = &COND_binlog_update; - const char* proc_info = thd->proc_info; - thd->proc_info = "Slave connection: waiting for binlog update"; - bool read_packet = 0, fatal_error = 0; #ifndef DBUG_OFF @@ -431,32 +436,30 @@ impossible position"; // no one will update the log while we are reading // now, but we'll be quick and just read one record pthread_mutex_lock(log_lock); - switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*) 0)) + switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) { case 0: + pthread_mutex_unlock(log_lock); read_packet = 1; // we read successfully, so we'll need to send it to the // slave break; case LOG_READ_EOF: - DBUG_PRINT("wait",("waiting for data on binary log")); + DBUG_PRINT("wait",("waiting for data in binary log")); + // wait_for_update unlocks the log lock - needed to avoid race if (!thd->killed) - pthread_cond_wait(&COND_binlog_update, log_lock); + mysql_bin_log.wait_for_update(thd); + else + pthread_mutex_unlock(log_lock); DBUG_PRINT("wait",("binary log received update")); break; default: + pthread_mutex_unlock(log_lock); fatal_error = 1; break; } - pthread_mutex_unlock(log_lock); - - pthread_mutex_lock(&thd->mysys_var->mutex); - thd->mysys_var->current_mutex= 0; - thd->mysys_var->current_cond= 0; - thd->proc_info= proc_info; - pthread_mutex_unlock(&thd->mysys_var->mutex); - + if (read_packet) { thd->proc_info = "sending update to slave"; @@ -548,39 +551,37 @@ impossible position"; DBUG_VOID_RETURN; } -int start_slave(THD* thd , bool net_report) +int start_slave(THD* thd , MASTER_INFO* mi, bool net_report) { int slave_errno = 0; if (!thd) thd = current_thd; NET* net = &thd->net; - + int thread_mask; + if (check_access(thd, PROCESS_ACL, any_db)) return 1; - pthread_mutex_lock(&LOCK_slave); - if (!slave_running) + lock_slave_threads(mi); // this allows us to cleanly read slave_running + init_thread_mask(&thread_mask,mi,1 /* inverse */); + if (thread_mask) { - if (init_master_info(&glob_mi)) - slave_errno = ER_MASTER_INFO; - else if (server_id_supplied && *glob_mi.host) - { - pthread_t hThread; - if (pthread_create(&hThread, &connection_attrib, handle_slave, 0)) - { - slave_errno = ER_SLAVE_THREAD; - } - while (!slave_running) // slave might already be running by now - pthread_cond_wait(&COND_slave_start, &LOCK_slave); - } + if (server_id_supplied && (!mi->inited || (mi->inited && *mi->host))) + slave_errno = start_slave_threads(0 /*no mutex */, + 1 /* wait for start */, + mi, + master_info_file,relay_log_info_file, + thread_mask); else slave_errno = ER_BAD_SLAVE; } else slave_errno = ER_SLAVE_MUST_STOP; - - pthread_mutex_unlock(&LOCK_slave); + + unlock_slave_threads(mi); + if (slave_errno) { - if (net_report) send_error(net, slave_errno); + if (net_report) + send_error(net, slave_errno); return 1; } else if (net_report) @@ -589,8 +590,7 @@ int start_slave(THD* thd , bool net_report) return 0; } - -int stop_slave(THD* thd, bool net_report ) +int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report ) { int slave_errno = 0; if (!thd) thd = current_thd; @@ -598,43 +598,14 @@ int stop_slave(THD* thd, bool net_report ) if (check_access(thd, PROCESS_ACL, any_db)) return 1; - - pthread_mutex_lock(&LOCK_slave); - if (slave_running) - { - abort_slave = 1; - KICK_SLAVE; - // do not abort the slave in the middle of a query, so we do not set - // thd->killed for the slave thread - thd->proc_info = "waiting for slave to die"; - while(slave_running) - { - /* there is a small chance that slave thread might miss the first - alarm. To protect againts it, resend the signal until it reacts - */ - - struct timespec abstime; -#ifdef HAVE_TIMESPEC_TS_SEC - abstime.ts_sec=time(NULL)+2; - abstime.ts_nsec=0; -#elif defined(__WIN__) - abstime.tv_sec=time((time_t*) 0)+2; - abstime.tv_nsec=0; -#else - struct timeval tv; - gettimeofday(&tv,0); - abstime.tv_sec=tv.tv_sec+2; - abstime.tv_nsec=tv.tv_usec*1000; -#endif - pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime); - if (slave_running) - KICK_SLAVE; - } - } - else - slave_errno = ER_SLAVE_NOT_RUNNING; - - pthread_mutex_unlock(&LOCK_slave); + thd->proc_info = "Killing slave"; + int thread_mask; + lock_slave_threads(mi); + init_thread_mask(&thread_mask,mi,0 /* not inverse*/); + slave_errno = (thread_mask) ? + terminate_slave_threads(mi,thread_mask, + 1 /*skip lock */) : ER_SLAVE_NOT_RUNNING; + unlock_slave_threads(mi); thd->proc_info = 0; if (slave_errno) @@ -649,31 +620,43 @@ int stop_slave(THD* thd, bool net_report ) return 0; } - -void reset_slave() +int reset_slave(MASTER_INFO* mi) { MY_STAT stat_area; char fname[FN_REFLEN]; - bool slave_was_running ; - - pthread_mutex_lock(&LOCK_slave); - if ((slave_was_running = slave_running)) + int restart_thread_mask = 0,error=0; + const char* errmsg=0; + + lock_slave_threads(mi); + init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */); + if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/)) + || (error=purge_relay_logs(&mi->rli,1 /*just reset*/,&errmsg))) + goto err; + + end_master_info(mi); + fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) { - pthread_mutex_unlock(&LOCK_slave); - stop_slave(0,0); + error=1; + goto err; } - else - pthread_mutex_unlock(&LOCK_slave); - - end_master_info(&glob_mi); - fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32); if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) - return; - if (slave_was_running) - start_slave(0,0); + { + error=1; + goto err; + } + if (restart_thread_mask) + error=start_slave_threads(0 /* mutex not needed*/, + 1 /* wait for start*/, + mi,master_info_file,relay_log_info_file, + restart_thread_mask); + // TODO: fix error messages so they get to the client +err: + unlock_slave_threads(mi); + return error; } - void kill_zombie_dump_threads(uint32 slave_server_id) { pthread_mutex_lock(&LOCK_thread_count); @@ -692,119 +675,114 @@ void kill_zombie_dump_threads(uint32 slave_server_id) make safe_mutex complain and abort. We just to do kill the thread ourselves. */ - - thr_alarm_kill(tmp->real_id); - tmp->killed = 1; - tmp->mysys_var->abort = 1; - pthread_mutex_lock(&tmp->mysys_var->mutex); - if (tmp->mysys_var->current_cond) - { - pthread_mutex_lock(tmp->mysys_var->current_mutex); - pthread_cond_broadcast(tmp->mysys_var->current_cond); - pthread_mutex_unlock(tmp->mysys_var->current_mutex); - } - pthread_mutex_unlock(&tmp->mysys_var->mutex); + tmp->awake(1/*prepare to die*/); } } pthread_mutex_unlock(&LOCK_thread_count); } -int change_master(THD* thd) +int change_master(THD* thd, MASTER_INFO* mi) { - bool slave_was_running; + int error=0,restart_thread_mask; + const char* errmsg=0; + // kill slave thread - pthread_mutex_lock(&LOCK_slave); - if ((slave_was_running = slave_running)) + lock_slave_threads(mi); + init_thread_mask(&restart_thread_mask,mi,0 /*not inverse*/); + if (restart_thread_mask && + (error=terminate_slave_threads(mi, + restart_thread_mask, + 1 /*skip lock*/))) { - abort_slave = 1; - KICK_SLAVE; - thd->proc_info = "waiting for slave to die"; - while (slave_running) - pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done + send_error(&thd->net,error); + unlock_slave_threads(mi); + return 1; } - pthread_mutex_unlock(&LOCK_slave); thd->proc_info = "changing master"; LEX_MASTER_INFO* lex_mi = &thd->lex.mi; - - if (init_master_info(&glob_mi)) + // TODO: see if needs re-write + if (init_master_info(mi,master_info_file,relay_log_info_file)) { send_error(&thd->net, 0, "Could not initialize master info"); + unlock_slave_threads(mi); return 1; } - pthread_mutex_lock(&glob_mi.lock); + pthread_mutex_lock(&mi->data_lock); if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos) { // if we change host or port, we must reset the postion - glob_mi.log_file_name[0] = 0; - glob_mi.pos = 4; // skip magic number - glob_mi.pending = 0; + mi->master_log_name[0] = 0; + mi->master_log_pos = 4; // skip magic number + mi->rli.pending = 0; } if (lex_mi->log_file_name) - strmake(glob_mi.log_file_name, lex_mi->log_file_name, - sizeof(glob_mi.log_file_name)); + strmake(mi->master_log_name, lex_mi->log_file_name, + sizeof(mi->master_log_name)); if (lex_mi->pos) { - glob_mi.pos = lex_mi->pos; - glob_mi.pending = 0; + mi->master_log_pos = lex_mi->pos; + mi->rli.pending = 0; } if (lex_mi->host) - strmake(glob_mi.host, lex_mi->host, sizeof(glob_mi.host)); + strmake(mi->host, lex_mi->host, sizeof(mi->host)); if (lex_mi->user) - strmake(glob_mi.user, lex_mi->user, sizeof(glob_mi.user)); + strmake(mi->user, lex_mi->user, sizeof(mi->user)); if (lex_mi->password) - strmake(glob_mi.password, lex_mi->password, sizeof(glob_mi.password)); + strmake(mi->password, lex_mi->password, sizeof(mi->password)); if (lex_mi->port) - glob_mi.port = lex_mi->port; + mi->port = lex_mi->port; if (lex_mi->connect_retry) - glob_mi.connect_retry = lex_mi->connect_retry; + mi->connect_retry = lex_mi->connect_retry; + + flush_master_info(mi); + pthread_mutex_unlock(&mi->data_lock); + thd->proc_info="purging old relay logs"; + if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/, + &errmsg)) + { + send_error(&thd->net, 0, "Failed purging old relay logs"); + unlock_slave_threads(mi); + return 1; + } + pthread_mutex_lock(&mi->rli.data_lock); + mi->rli.master_log_pos = mi->master_log_pos; + strnmov(mi->rli.master_log_name,mi->master_log_name, + sizeof(mi->rli.master_log_name)); + if (!mi->rli.master_log_name[0]) // uninitialized case + mi->rli.master_log_pos=0; + pthread_cond_broadcast(&mi->rli.data_cond); + pthread_mutex_unlock(&mi->rli.data_lock); - flush_master_info(&glob_mi); - pthread_mutex_unlock(&glob_mi.lock); thd->proc_info = "starting slave"; - if (slave_was_running) - start_slave(0,0); + if (restart_thread_mask) + error=start_slave_threads(0 /* mutex not needed*/, + 1 /* wait for start*/, + mi,master_info_file,relay_log_info_file, + restart_thread_mask); +err: + unlock_slave_threads(mi); thd->proc_info = 0; - - send_ok(&thd->net); + if (error) + send_error(&thd->net,error); + else + send_ok(&thd->net); return 0; } - -void reset_master() +int reset_master(THD* thd) { if (!mysql_bin_log.is_open()) { my_error(ER_FLUSH_MASTER_BINLOG_CLOSED, MYF(ME_BELL+ME_WAITTANG)); - return; - } - - LOG_INFO linfo; - pthread_mutex_t* log_lock = mysql_bin_log.get_log_lock(); - pthread_mutex_lock(log_lock); - if (mysql_bin_log.find_first_log(&linfo, "")) - { - pthread_mutex_unlock(log_lock); - return; - } - - for(;;) - { - my_delete(linfo.log_file_name, MYF(MY_WME)); - if (mysql_bin_log.find_next_log(&linfo)) - break; + return 1; } - mysql_bin_log.close(1); // exiting close - my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME)); - mysql_bin_log.set_need_start_event(); - mysql_bin_log.open(opt_bin_logname,LOG_BIN); - pthread_mutex_unlock(log_lock); + return mysql_bin_log.reset_logs(thd); } - int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, const char* log_file_name2, ulonglong log_pos2) { @@ -891,6 +869,7 @@ int show_binlog_events(THD* thd) if (event_count < limit_end && log.error) { errmsg = "Wrong offset or I/O error"; + pthread_mutex_unlock(mysql_bin_log.get_log_lock()); goto err; } diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 4b9f741dde7..360fd50a1e3 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -26,30 +26,26 @@ extern int max_binlog_dump_events; extern bool opt_sporadic_binlog_dump_fail; #endif -#ifdef SIGNAL_WITH_VIO_CLOSE -#define KICK_SLAVE { slave_thd->close_active_vio(); \ - thr_alarm_kill(slave_real_id); } -#else -#define KICK_SLAVE thr_alarm_kill(slave_real_id); -#endif +#define KICK_SLAVE(thd) thd->awake(0 /* do not prepare to die*/); File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg); -int start_slave(THD* thd = 0, bool net_report = 1); -int stop_slave(THD* thd = 0, bool net_report = 1); -int change_master(THD* thd); +int start_slave(THD* thd, MASTER_INFO* mi, bool net_report); +int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report); +int change_master(THD* thd, MASTER_INFO* mi); int show_binlog_events(THD* thd); int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, const char* log_file_name2, ulonglong log_pos2); -void reset_slave(); -void reset_master(); +int reset_slave(MASTER_INFO* mi); +int reset_master(THD* thd); int purge_master_logs(THD* thd, const char* to_log); bool log_in_use(const char* log_name); void adjust_linfo_offsets(my_off_t purge_offset); int show_binlogs(THD* thd); extern int init_master_info(MASTER_INFO* mi); void kill_zombie_dump_threads(uint32 slave_server_id); +int check_binlog_magic(IO_CACHE* log, const char** errmsg); typedef struct st_load_file_info { diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 19c3d89caaf..1dac8033ad7 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -1168,6 +1168,15 @@ int mysqld_show(THD *thd, const char *wild, show_var_st *variables) case SHOW_RPL_STATUS: net_store_data(&packet2, rpl_status_type[(int)rpl_status]); break; + case SHOW_SLAVE_RUNNING: + { + LOCK_ACTIVE_MI; + net_store_data(&packet2, (active_mi->slave_running && + active_mi->rli.slave_running) + ? "ON" : "OFF"); + UNLOCK_ACTIVE_MI; + break; + } case SHOW_OPENTABLES: net_store_data(&packet2,(uint32) cached_tables()); break; diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 01ba468175e..2b62ee03575 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -2453,15 +2453,14 @@ show_param: YYABORT; } | NEW_SYM MASTER_SYM FOR_SYM SLAVE WITH MASTER_LOG_FILE_SYM EQ - TEXT_STRING AND MASTER_LOG_POS_SYM EQ ulonglong_num AND - MASTER_LOG_SEQ_SYM EQ ULONG_NUM AND MASTER_SERVER_ID_SYM EQ + TEXT_STRING AND MASTER_LOG_POS_SYM EQ ulonglong_num + AND MASTER_SERVER_ID_SYM EQ ULONG_NUM { Lex->sql_command = SQLCOM_SHOW_NEW_MASTER; Lex->mi.log_file_name = $8.str; Lex->mi.pos = $12; - Lex->mi.last_log_seq = $16; - Lex->mi.server_id = $20; + Lex->mi.server_id = $16; } | MASTER_SYM LOGS_SYM { @@ -3083,12 +3082,18 @@ option_value: } | SQL_SLAVE_SKIP_COUNTER equal ULONG_NUM { - pthread_mutex_lock(&LOCK_slave); - if (slave_running) + LOCK_ACTIVE_MI; + pthread_mutex_lock(&active_mi->rli.run_lock); + if (active_mi->rli.slave_running) send_error(¤t_thd->net, ER_SLAVE_MUST_STOP); else - slave_skip_counter = $3; - pthread_mutex_unlock(&LOCK_slave); + { + pthread_mutex_lock(&active_mi->rli.data_lock); + active_mi->rli.slave_skip_counter = $3; + pthread_mutex_unlock(&active_mi->rli.data_lock); + } + pthread_mutex_unlock(&active_mi->rli.run_lock); + UNLOCK_ACTIVE_MI; } | ident equal DEFAULT { diff --git a/sql/stacktrace.c b/sql/stacktrace.c index bef53c71a42..dd7db5548c1 100644 --- a/sql/stacktrace.c +++ b/sql/stacktrace.c @@ -123,7 +123,7 @@ terribly wrong...\n"); } #endif /* __alpha__ */ - if (!stack_bottom) + if (!stack_bottom || (gptr) stack_bottom > (gptr) &fp) { ulong tmp= min(0x10000,thread_stack); /* Assume that the stack starts at the previous even 65K */ diff --git a/sql/structs.h b/sql/structs.h index 780057061c3..e76e628ddda 100644 --- a/sql/structs.h +++ b/sql/structs.h @@ -140,7 +140,7 @@ enum SHOW_TYPE { SHOW_LONG,SHOW_CHAR,SHOW_INT,SHOW_CHAR_PTR,SHOW_BOOL, ,SHOW_SSL_CTX_SESS_TIMEOUTS, SHOW_SSL_CTX_SESS_CACHE_FULL ,SHOW_SSL_GET_CIPHER_LIST #endif /* HAVE_OPENSSL */ - ,SHOW_RPL_STATUS + ,SHOW_RPL_STATUS, SHOW_SLAVE_RUNNING }; enum SHOW_COMP_OPTION { SHOW_OPTION_YES, SHOW_OPTION_NO, SHOW_OPTION_DISABLED}; diff --git a/support-files/build-tags b/support-files/build-tags new file mode 100755 index 00000000000..d5f9fbf5100 --- /dev/null +++ b/support-files/build-tags @@ -0,0 +1,9 @@ +#! /bin/sh + +rm -f TAGS +filter='\.cc$\|\.c$\|\.h$\|\.yy$' +files=`bk -r sfiles -gU | grep $filter ` +for f in $files ; +do + etags -o TAGS --append $f +done |