summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Widenius <monty@askmonty.org>2012-09-28 02:06:56 +0300
committerMichael Widenius <monty@askmonty.org>2012-09-28 02:06:56 +0300
commit1864d9596d9531427f1d63f86ada9e0cb21b51db (patch)
tree89ad647a849fea26b336faf565573a8d226ca7a9
parent620d14f8c3521f9ec7283b8690e0e16434739d33 (diff)
downloadmariadb-git-1864d9596d9531427f1d63f86ada9e0cb21b51db.tar.gz
Implementation of Multi-source replication (MDEV:253)
Documentation of the feature can be found at: http://kb.askmonty.org/en/multi-source-replication/ This code is based on code from Taobao, developed by Plinux BUILD/SETUP.sh: Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it) client/mysqltest.cc: Added support for error names starting with 'W' Added connection_name support to --sync_with_master cmake/maintainer.cmake: Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it) mysql-test/r/mysqltest.result: Updated results mysql-test/r/parser.result: Updated results mysql-test/suite/multi_source/my.cnf: Setup of multi-master tests mysql-test/suite/multi_source/simple.result: Simple basic test of multi-source functionality mysql-test/suite/multi_source/simple.test: Simple basic test of multi-source functionality mysql-test/suite/multi_source/syntax.result: Test of multi-source syntax mysql-test/suite/multi_source/syntax.test: Test of multi-source syntax mysql-test/suite/rpl/r/rpl_rotate_logs.result: Updated results because of new error messages mysql-test/t/parser.test: Updated test as master_pos_wait() now takes more arguments than before sql/event_scheduler.cc: No reason to initialize slave_thread (it's guaranteed to be zero here) sql/item_create.cc: Added connection_name argument to master_pos_wait() Simplified code sql/item_func.cc: Added connection_name argument to master_pos_wait() sql/item_func.h: Added connection_name argument to master_pos_wait() sql/log.cc: Added tag "Master 'connection_name'" to slave errors that has a connection name. sql/mysqld.cc: Added variable mysqld_server_initialized so that other functions can test if server is fully initialized. Free all slave data in one place (fewer ifdef's) Removed not needed call to close_active_mi() Initialize slaves() later in startup to ensure that everthing is really initialized when slaves start. Made status variable slave_running multi-source safe sql/mysqld.h: Added mysqld_server_initialized sql/rpl_mi.cc: Store connection name and cmp_connection_name (only used for show full slave status) in Master_info Added code for Master_info_index, which handles storage of multi-master information Don't write the empty "" connection_name to multi-master.info file. This is handled by the original code. sql/rpl_mi.h: Added connection_name and Master_info_index sql/rpl_rli.cc: Added connection_name to relay log files. sql/rpl_rli.h: Fixed type of slave_skip_counter as we now access it directly in sys_vars.cc, so it must be uint sql/share/errmsg-utf8.txt: Added new error messages needed for multi-source Added multi-source name to error ER_MASTER_INFO and WARN_NO_MASTER_INFO sql/slave.cc: Moved things a bit around to make it easier to handle error conditions. Create a global master_info_index and add the "" connection to it Ensure that new Master_info doesn't fail. Don't call terminate_slave_threads(active_mi..) on end_slave() as this is now done automaticly when deleting master_info_index. Delete not needed function close_active_mi(). One can achive same thing by calling end_slave(). Added support for SHOW FULL SLAVE STATUS (show status for all master connections with connection_name as first column) sql/slave.h: Added new prototypes sql/sql_base.cc: More DBUG_PRINT sql/sql_class.cc: Reset thd->connection_name and thd-->default_master_connection sql/sql_class.h: Added thd->connection_name and thd-->default_master_connection Added slave_skip_count to variables to make changing the @@sql_slave_skip_count variable thread safe sql/sql_const.h: Added MAX_CONNECTION_NAME sql/sql_lex.cc: Reset 'lex->verbose' (to simplify some sql_yacc.yy code) sql/sql_lex.h: Added connection_name sql/sql_parse.cc: Added support for connection_name to all SLAVE commands. - Instead of using active_mi, we now get the current Master_info from master_info_index. - Create new replication threads with CHANGE MASTER - Added support for show_all_master_info() sql/sql_reload.cc: Made reset/full slave use master_info_index->get_master_info() instead of active_mi. If one uses 'RESET SLAVE "connection_name" all' the connection is removed from master_info_index. sql/sql_repl.cc: sql_slave_skip_counter is moved to thd->variables to make it thread safe and fix some bugs with it Add connection name to relay log files. Added connection name to errors. Added some logging for multi-master if log_warnings > 1 stop_slave(): - Don't check if thd is set. It's guaranteed to always be set. change_master(): - Check for duplicate connection names in change_master() - Check for wrong arguments first in file (to simplify error handling) - Register new connections in master_info_index sql/sql_yacc.yy: Added optional connection_name to a all relevant master/slave commands sql/strfunc.cc: my_global.h shoud always be included first. sql/sys_vars.cc: Added variable default_master_connection Made variable sql_slave_skip_counter multi-source safe sql/sys_vars.h: Added Sys_var_session_lexstring (needed for default_master_connection) Added Sys_var_multi_source_uint (needed for sql_slave_skip_counter).
-rwxr-xr-xBUILD/SETUP.sh2
-rw-r--r--client/mysqltest.cc42
-rw-r--r--cmake/maintainer.cmake2
-rw-r--r--mysql-test/r/mysqltest.result2
-rw-r--r--mysql-test/r/parser.result2
-rw-r--r--mysql-test/suite/multi_source/my.cnf22
-rw-r--r--mysql-test/suite/multi_source/simple.result64
-rw-r--r--mysql-test/suite/multi_source/simple.test38
-rw-r--r--mysql-test/suite/multi_source/skip_counter.result81
-rw-r--r--mysql-test/suite/multi_source/skip_counter.test126
-rw-r--r--mysql-test/suite/multi_source/syntax.result89
-rw-r--r--mysql-test/suite/multi_source/syntax.test77
-rw-r--r--mysql-test/suite/rpl/r/rpl_rotate_logs.result4
-rw-r--r--mysql-test/t/parser.test2
-rw-r--r--sql/event_scheduler.cc1
-rw-r--r--sql/item_create.cc23
-rw-r--r--sql/item_func.cc34
-rw-r--r--sql/item_func.h3
-rw-r--r--sql/log.cc21
-rw-r--r--sql/mysqld.cc68
-rw-r--r--sql/mysqld.h2
-rw-r--r--sql/rpl_mi.cc472
-rw-r--r--sql/rpl_mi.h51
-rw-r--r--sql/rpl_rli.cc22
-rw-r--r--sql/rpl_rli.h3
-rw-r--r--sql/share/errmsg-utf8.txt19
-rw-r--r--sql/slave.cc199
-rw-r--r--sql/slave.h9
-rw-r--r--sql/sql_base.cc5
-rw-r--r--sql/sql_class.cc10
-rw-r--r--sql/sql_class.h10
-rw-r--r--sql/sql_const.h1
-rw-r--r--sql/sql_lex.cc1
-rw-r--r--sql/sql_lex.h1
-rw-r--r--sql/sql_parse.cc107
-rw-r--r--sql/sql_reload.cc17
-rw-r--r--sql/sql_repl.cc127
-rw-r--r--sql/sql_yacc.yy51
-rw-r--r--sql/strfunc.cc1
-rw-r--r--sql/sys_vars.cc104
-rw-r--r--sql/sys_vars.h142
41 files changed, 1828 insertions, 229 deletions
diff --git a/BUILD/SETUP.sh b/BUILD/SETUP.sh
index de3cb4890a9..5154f64b98f 100755
--- a/BUILD/SETUP.sh
+++ b/BUILD/SETUP.sh
@@ -139,7 +139,7 @@ else
# C warnings
c_warnings="$warnings"
# C++ warnings
- cxx_warnings="$warnings -Wno-unused-parameter"
+ cxx_warnings="$warnings -Wno-unused-parameter -Wno-invalid-offsetof"
# cxx_warnings="$cxx_warnings -Woverloaded-virtual -Wsign-promo"
cxx_warnings="$cxx_warnings -Wnon-virtual-dtor"
debug_extra_cflags="-O0 -g3 -gdwarf-2"
diff --git a/client/mysqltest.cc b/client/mysqltest.cc
index bb89db4b231..f824d53ed1b 100644
--- a/client/mysqltest.cc
+++ b/client/mysqltest.cc
@@ -253,6 +253,8 @@ static void init_re(void);
static int match_re(my_regex_t *, char *);
static void free_re(void);
+static char *get_string(char **to_ptr, char **from_ptr,
+ struct st_command *command);
static int replace(DYNAMIC_STRING *ds_str,
const char *search_str, ulong search_len,
const char *replace_str, ulong replace_len);
@@ -4580,7 +4582,8 @@ void do_wait_for_slave_to_stop(struct st_command *c __attribute__((unused)))
}
-void do_sync_with_master2(struct st_command *command, long offset)
+void do_sync_with_master2(struct st_command *command, long offset,
+ const char *connection_name)
{
MYSQL_RES *res;
MYSQL_ROW row;
@@ -4591,8 +4594,9 @@ void do_sync_with_master2(struct st_command *command, long offset)
if (!master_pos.file[0])
die("Calling 'sync_with_master' without calling 'save_master_pos'");
- sprintf(query_buf, "select master_pos_wait('%s', %ld, %d)",
- master_pos.file, master_pos.pos + offset, timeout);
+ sprintf(query_buf, "select master_pos_wait('%s', %ld, %d, '%s')",
+ master_pos.file, master_pos.pos + offset, timeout,
+ connection_name);
if (mysql_query(mysql, query_buf))
die("failed in '%s': %d: %s", query_buf, mysql_errno(mysql),
@@ -4652,16 +4656,32 @@ void do_sync_with_master(struct st_command *command)
long offset= 0;
char *p= command->first_argument;
const char *offset_start= p;
+ char *start, *buff= 0;
+ start= (char*) "";
+
if (*offset_start)
{
for (; my_isdigit(charset_info, *p); p++)
offset = offset * 10 + *p - '0';
- if(*p && !my_isspace(charset_info, *p))
+ if (*p && !my_isspace(charset_info, *p) && *p != ',')
die("Invalid integer argument \"%s\"", offset_start);
+
+ while (*p && my_isspace(charset_info, *p))
+ p++;
+ if (*p == ',')
+ {
+ p++;
+ while (*p && my_isspace(charset_info, *p))
+ p++;
+ start= buff= (char*)my_malloc(strlen(p)+1,MYF(MY_WME | MY_FAE));
+ get_string(&buff, &p, command);
+ }
command->last_argument= p;
}
- do_sync_with_master2(command, offset);
+ do_sync_with_master2(command, offset, start);
+ if (buff)
+ my_free(start);
return;
}
@@ -5286,7 +5306,7 @@ void do_get_errcodes(struct st_command *command)
{
die("The sqlstate definition must start with an uppercase S");
}
- else if (*p == 'E')
+ else if (*p == 'E' || *p == 'W')
{
/* Error name string */
@@ -5295,9 +5315,9 @@ void do_get_errcodes(struct st_command *command)
to->type= ERR_ERRNO;
DBUG_PRINT("info", ("ERR_ERRNO: %d", to->code.errnum));
}
- else if (*p == 'e')
+ else if (*p == 'e' || *p == 'w')
{
- die("The error name definition must start with an uppercase E");
+ die("The error name definition must start with an uppercase E or W");
}
else
{
@@ -5360,8 +5380,8 @@ void do_get_errcodes(struct st_command *command)
If string is a '$variable', return the value of the variable.
*/
-char *get_string(char **to_ptr, char **from_ptr,
- struct st_command *command)
+static char *get_string(char **to_ptr, char **from_ptr,
+ struct st_command *command)
{
char c, sep;
char *to= *to_ptr, *from= *from_ptr, *start=to;
@@ -9288,7 +9308,7 @@ int main(int argc, char **argv)
select_connection(command);
else
select_connection_name("slave");
- do_sync_with_master2(command, 0);
+ do_sync_with_master2(command, 0, "");
break;
}
case Q_COMMENT:
diff --git a/cmake/maintainer.cmake b/cmake/maintainer.cmake
index a70226e2b6e..d52405e8bcc 100644
--- a/cmake/maintainer.cmake
+++ b/cmake/maintainer.cmake
@@ -18,7 +18,7 @@ INCLUDE(CheckCCompilerFlag)
# Setup GCC (GNU C compiler) warning options.
MACRO(SET_MYSQL_MAINTAINER_GNU_C_OPTIONS)
SET(MY_MAINTAINER_WARNINGS
- "-Wall -Wextra -Wunused -Wwrite-strings -Wno-strict-aliasing -DFORCE_INIT_OF_VARS")
+ "-Wall -Wextra -Wunused -Wwrite-strings -Wno-strict-aliasing -Wno-invalid-offsetof -DFORCE_INIT_OF_VARS")
CHECK_C_COMPILER_FLAG("-Wno-missing-field-initializers"
HAVE_NO_MISSING_FIELD_INITIALIZERS)
diff --git a/mysql-test/r/mysqltest.result b/mysql-test/r/mysqltest.result
index fdb3029059f..d08ffd96a04 100644
--- a/mysql-test/r/mysqltest.result
+++ b/mysql-test/r/mysqltest.result
@@ -232,7 +232,7 @@ mysqltest: At line 2: Spurious text after `query` expression
mysqltest: At line 1: Missing argument(s) to 'error'
mysqltest: At line 1: Missing argument(s) to 'error'
mysqltest: At line 1: The sqlstate definition must start with an uppercase S
-mysqltest: At line 1: The error name definition must start with an uppercase E
+mysqltest: At line 1: The error name definition must start with an uppercase E or W
mysqltest: At line 1: Invalid argument to error: '9eeeee' - the errno may only consist of digits[0-9]
mysqltest: At line 1: Invalid argument to error: '1sssss' - the errno may only consist of digits[0-9]
mysqltest: At line 1: The sqlstate must be exactly 5 chars long
diff --git a/mysql-test/r/parser.result b/mysql-test/r/parser.result
index 10700d0ba73..54378f16d49 100644
--- a/mysql-test/r/parser.result
+++ b/mysql-test/r/parser.result
@@ -441,7 +441,7 @@ select master_pos_wait();
ERROR 42000: Incorrect parameter count in the call to native function 'master_pos_wait'
select master_pos_wait(1);
ERROR 42000: Incorrect parameter count in the call to native function 'master_pos_wait'
-select master_pos_wait(1, 2, 3, 4);
+select master_pos_wait(1, 2, 3, 4, 5);
ERROR 42000: Incorrect parameter count in the call to native function 'master_pos_wait'
select rand(1, 2, 3);
ERROR 42000: Incorrect parameter count in the call to native function 'rand'
diff --git a/mysql-test/suite/multi_source/my.cnf b/mysql-test/suite/multi_source/my.cnf
new file mode 100644
index 00000000000..dd57fd2199f
--- /dev/null
+++ b/mysql-test/suite/multi_source/my.cnf
@@ -0,0 +1,22 @@
+# cat t/multisource1.cnf
+!include include/default_mysqld.cnf
+!include include/default_client.cnf
+
+[mysqld.1]
+server-id=1
+log-bin=master-bin
+
+[mysqld.2]
+server-id=2
+log-bin=master-bin
+
+[mysqld.3]
+server-id=3
+
+[ENV]
+SERVER_MYPORT_1= @mysqld.1.port
+SERVER_MYSOCK_1= @mysqld.1.socket
+SERVER_MYPORT_2= @mysqld.2.port
+SERVER_MYSOCK_2= @mysqld.2.socket
+SERVER_MYPORT_3= @mysqld.3.port
+SERVER_MYSOCK_3= @mysqld.3.socket
diff --git a/mysql-test/suite/multi_source/simple.result b/mysql-test/suite/multi_source/simple.result
new file mode 100644
index 00000000000..7448e0a0113
--- /dev/null
+++ b/mysql-test/suite/multi_source/simple.result
@@ -0,0 +1,64 @@
+change master 'slave1' to master_port=MYPORT_1, master_host='127.0.0.1', master_user='root';
+change master 'slave2' to master_port=MYPORT_2, master_host='127.0.0.1', master_user='root';
+start slave 'slave1';
+start slave 'slave2';
+show full slave status;
+Connection_name Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master Master_SSL_Verify_Server_Cert Last_IO_Errno Last_IO_Error Last_SQL_Errno Last_SQL_Error Replicate_Ignore_Server_Ids Master_Server_Id
+slave1 Waiting for master to send event 127.0.0.1 root MYPORT_1 60 master-bin.000001 286 mysqld-relay-bin-slave1.000002 572 master-bin.000001 Yes Yes 0 0 286 875 None 0 No 0 No 0 0 1
+slave2 Waiting for master to send event 127.0.0.1 root MYPORT_2 60 master-bin.000001 286 mysqld-relay-bin-slave2.000002 572 master-bin.000001 Yes Yes 0 0 286 875 None 0 No 0 No 0 0 2
+stop slave 'slave1';
+show slave 'slave1' status;
+Slave_IO_State
+Master_Host 127.0.0.1
+Master_User root
+Master_Port MYPORT_1
+Connect_Retry 60
+Master_Log_File master-bin.000001
+Read_Master_Log_Pos 286
+Relay_Log_File mysqld-relay-bin-slave1.000002
+Relay_Log_Pos 572
+Relay_Master_Log_File master-bin.000001
+Slave_IO_Running No
+Slave_SQL_Running No
+Replicate_Do_DB
+Replicate_Ignore_DB
+Replicate_Do_Table
+Replicate_Ignore_Table
+Replicate_Wild_Do_Table
+Replicate_Wild_Ignore_Table
+Last_Errno 0
+Last_Error
+Skip_Counter 0
+Exec_Master_Log_Pos 286
+Relay_Log_Space 875
+Until_Condition None
+Until_Log_File
+Until_Log_Pos 0
+Master_SSL_Allowed No
+Master_SSL_CA_File
+Master_SSL_CA_Path
+Master_SSL_Cert
+Master_SSL_Cipher
+Master_SSL_Key
+Seconds_Behind_Master NULL
+Master_SSL_Verify_Server_Cert No
+Last_IO_Errno 0
+Last_IO_Error
+Last_SQL_Errno 0
+Last_SQL_Error
+Replicate_Ignore_Server_Ids
+Master_Server_Id 1
+reset slave 'slave1';
+show full slave status;
+Connection_name Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master Master_SSL_Verify_Server_Cert Last_IO_Errno Last_IO_Error Last_SQL_Errno Last_SQL_Error Replicate_Ignore_Server_Ids Master_Server_Id
+slave1 127.0.0.1 root MYPORT_1 60 4 mysqld-relay-bin-slave1.000001 4 No No 0 0 0 265 None 0 No NULL No 0 0 1
+slave2 Waiting for master to send event 127.0.0.1 root MYPORT_2 60 master-bin.000001 286 mysqld-relay-bin-slave2.000002 572 master-bin.000001 Yes Yes 0 0 286 875 None 0 No 0 No 0 0 2
+reset slave 'slave1' all;
+show full slave status;
+Connection_name Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master Master_SSL_Verify_Server_Cert Last_IO_Errno Last_IO_Error Last_SQL_Errno Last_SQL_Error Replicate_Ignore_Server_Ids Master_Server_Id
+slave2 Waiting for master to send event 127.0.0.1 root MYPORT_2 60 master-bin.000001 286 mysqld-relay-bin-slave2.000002 572 master-bin.000001 Yes Yes 0 0 286 875 None 0 No 0 No 0 0 2
+stop slave 'slave2';
+show full slave status;
+Connection_name Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master Master_SSL_Verify_Server_Cert Last_IO_Errno Last_IO_Error Last_SQL_Errno Last_SQL_Error Replicate_Ignore_Server_Ids Master_Server_Id
+slave2 127.0.0.1 root MYPORT_2 60 master-bin.000001 286 mysqld-relay-bin-slave2.000002 572 master-bin.000001 No No 0 0 286 875 None 0 No NULL No 0 0 2
+reset slave 'slave2' all;
diff --git a/mysql-test/suite/multi_source/simple.test b/mysql-test/suite/multi_source/simple.test
new file mode 100644
index 00000000000..97696c6e75f
--- /dev/null
+++ b/mysql-test/suite/multi_source/simple.test
@@ -0,0 +1,38 @@
+#
+# Simple multi-master test
+#
+
+--connect (slave,127.0.0.1,root,,,$SERVER_MYPORT_3)
+
+--replace_result $SERVER_MYPORT_1 MYPORT_1
+eval change master 'slave1' to master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root';
+--replace_result $SERVER_MYPORT_2 MYPORT_2
+eval change master 'slave2' to master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root';
+start slave 'slave1';
+start slave 'slave2';
+--sleep 5
+
+--replace_result $SERVER_MYPORT_1 MYPORT_1 $SERVER_MYPORT_2 MYPORT_2
+show full slave status;
+
+stop slave 'slave1';
+
+--replace_result $SERVER_MYPORT_1 MYPORT_1 $SERVER_MYPORT_2 MYPORT_2
+query_vertical show slave 'slave1' status;
+
+reset slave 'slave1';
+--replace_result $SERVER_MYPORT_1 MYPORT_1 $SERVER_MYPORT_2 MYPORT_2
+show full slave status;
+
+reset slave 'slave1' all;
+--replace_result $SERVER_MYPORT_1 MYPORT_1 $SERVER_MYPORT_2 MYPORT_2
+show full slave status;
+
+stop slave 'slave2';
+--replace_result $SERVER_MYPORT_1 MYPORT_1 $SERVER_MYPORT_2 MYPORT_2
+show full slave status;
+
+#
+# clean up
+#
+reset slave 'slave2' all;
diff --git a/mysql-test/suite/multi_source/skip_counter.result b/mysql-test/suite/multi_source/skip_counter.result
new file mode 100644
index 00000000000..1fb1c255d80
--- /dev/null
+++ b/mysql-test/suite/multi_source/skip_counter.result
@@ -0,0 +1,81 @@
+connect master1,127.0.0.1,root,,,$SERVER_MYPORT_1;
+drop database if exists db;
+create database db;
+create table db.t1 (i int) engine=MyISAM;
+connect master2,127.0.0.1,root,,,$SERVER_MYPORT_2;
+drop database if exists db;
+create database db;
+create table db.t2 (i int) engine=MyISAM;
+connect slave,127.0.0.1,root,,,$SERVER_MYPORT_3;
+change master 'master1' to
+master_port=MYPORT_1,
+master_host='127.0.0.1',
+master_user='root';
+start slave 'master1';
+set default_master_connection = 'master1';
+include/wait_for_slave_to_start.inc
+set default_master_connection = 'master2';
+change master 'master2' to
+master_port=MYPORT_2,
+master_host='127.0.0.1',
+master_user='root';
+set global sql_slave_skip_counter = 2;
+select @@global.sql_slave_skip_counter;
+@@global.sql_slave_skip_counter
+2
+select @@session.sql_slave_skip_counter;
+@@session.sql_slave_skip_counter
+2
+set session sql_slave_skip_counter = 3;
+select @@global.sql_slave_skip_counter;
+@@global.sql_slave_skip_counter
+3
+select @@session.sql_slave_skip_counter;
+@@session.sql_slave_skip_counter
+3
+set global sql_slave_skip_counter= default;
+select @@global.sql_slave_skip_counter;
+@@global.sql_slave_skip_counter
+0
+select @@session.sql_slave_skip_counter;
+@@session.sql_slave_skip_counter
+0
+set global sql_slave_skip_counter= 3;
+set default_master_connection = 'master1';
+select @@session.sql_slave_skip_counter;
+@@session.sql_slave_skip_counter
+0
+set default_master_connection = 'qqq';
+select @@session.sql_slave_skip_counter;
+@@session.sql_slave_skip_counter
+0
+Warnings:
+Warning 1617 There is no master connection 'qqq'
+set default_master_connection = 'master2';
+select @@session.sql_slave_skip_counter;
+@@session.sql_slave_skip_counter
+3
+start slave 'master2';
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+connection master2;
+connection slave;
+show tables in db;
+Tables_in_db
+t1
+t2
+drop database db;
+set default_master_connection = 'master1';
+stop slave;
+include/wait_for_slave_to_stop.inc
+set default_master_connection = 'master2';
+stop slave;
+include/wait_for_slave_to_stop.inc
+set global sql_slave_skip_counter = 0;
+disconnect slave;
+connection master1;
+drop database db;
+disconnect master1;
+connection master2;
+drop database db;
+disconnect master2;
diff --git a/mysql-test/suite/multi_source/skip_counter.test b/mysql-test/suite/multi_source/skip_counter.test
new file mode 100644
index 00000000000..fe5df69097a
--- /dev/null
+++ b/mysql-test/suite/multi_source/skip_counter.test
@@ -0,0 +1,126 @@
+#
+# Test of sql_slave_skip_counter
+#
+
+--enable_connect_log
+
+# Create a schema and a table i
+# on the 1st master
+
+--connect (master1,127.0.0.1,root,,,$SERVER_MYPORT_1)
+
+--disable_warnings
+drop database if exists db;
+--enable_warnings
+create database db;
+create table db.t1 (i int) engine=MyISAM;
+--save_master_pos
+
+
+# Create the same schema and another table
+# on the 2nd master
+
+--connect (master2,127.0.0.1,root,,,$SERVER_MYPORT_2)
+
+--disable_warnings
+drop database if exists db;
+--enable_warnings
+create database db;
+create table db.t2 (i int) engine=MyISAM;
+
+--connect (slave,127.0.0.1,root,,,$SERVER_MYPORT_3)
+--disable_connect_log
+
+# Start replication from the first master
+
+--replace_result $SERVER_MYPORT_1 MYPORT_1
+eval change master 'master1' to
+master_port=$SERVER_MYPORT_1,
+master_host='127.0.0.1',
+master_user='root';
+
+start slave 'master1';
+set default_master_connection = 'master1';
+--source include/wait_for_slave_to_start.inc
+--sync_with_master 0,'master1'
+
+# Start replication from the second master
+
+set default_master_connection = 'master2';
+
+--replace_result $SERVER_MYPORT_2 MYPORT_2
+eval change master 'master2' to
+master_port=$SERVER_MYPORT_2,
+master_host='127.0.0.1',
+master_user='root';
+
+# the schema creation will be replicated from the 1st master,
+# so we want to skip it in the second replication connection.
+
+# Normally it should have been 2 events, but
+# currently Binlog_checkpoint also counts. Maybe we'll need
+# to modify the test later
+
+--let $skip_counter_saved = `select @@global.sql_slave_skip_counter`
+set global sql_slave_skip_counter = 2;
+select @@global.sql_slave_skip_counter;
+select @@session.sql_slave_skip_counter;
+set session sql_slave_skip_counter = 3;
+select @@global.sql_slave_skip_counter;
+select @@session.sql_slave_skip_counter;
+set global sql_slave_skip_counter= default;
+select @@global.sql_slave_skip_counter;
+select @@session.sql_slave_skip_counter;
+set global sql_slave_skip_counter= 3;
+set default_master_connection = 'master1';
+select @@session.sql_slave_skip_counter;
+set default_master_connection = 'qqq';
+select @@session.sql_slave_skip_counter;
+set default_master_connection = 'master2';
+select @@session.sql_slave_skip_counter;
+
+start slave 'master2';
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+--enable_connect_log
+
+--connection master2
+--save_master_pos
+
+--connection slave
+
+--disable_connect_log
+--sync_with_master 0,'master2'
+
+# If the skip_counter worked as expected, we should
+# get here (replication shouldn't have broken)
+# and should see both tables here
+# (drop database which came from master2 shoudn't have been executed
+# so t1 should still exist)
+
+show tables in db;
+
+# Cleanup
+
+drop database db;
+set default_master_connection = 'master1';
+stop slave;
+
+--source include/wait_for_slave_to_stop.inc
+set default_master_connection = 'master2';
+stop slave;
+
+--source include/wait_for_slave_to_stop.inc
+--eval set global sql_slave_skip_counter = $skip_counter_saved
+
+--enable_connect_log
+--disconnect slave
+
+--connection master1
+drop database db;
+--disconnect master1
+
+--connection master2
+drop database db;
+--disconnect master2
diff --git a/mysql-test/suite/multi_source/syntax.result b/mysql-test/suite/multi_source/syntax.result
new file mode 100644
index 00000000000..23ea4d9fd3d
--- /dev/null
+++ b/mysql-test/suite/multi_source/syntax.result
@@ -0,0 +1,89 @@
+include/master-slave.inc
+[connection master]
+show slave status;
+Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master Master_SSL_Verify_Server_Cert Last_IO_Errno Last_IO_Error Last_SQL_Errno Last_SQL_Error Replicate_Ignore_Server_Ids Master_Server_Id
+show slave '' status;
+Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master Master_SSL_Verify_Server_Cert Last_IO_Errno Last_IO_Error Last_SQL_Errno Last_SQL_Error Replicate_Ignore_Server_Ids Master_Server_Id
+show full slave status;
+Connection_name Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master Master_SSL_Verify_Server_Cert Last_IO_Errno Last_IO_Error Last_SQL_Errno Last_SQL_Error Replicate_Ignore_Server_Ids Master_Server_Id
+#
+# Check error handling
+#
+show slave 'qqq' status;
+ERROR HY000: There is no master connection 'qqq'
+show slave 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' status;
+ERROR HY000: There is no master connection 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc'
+show slave 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' status;
+ERROR HY000: Incorrect arguments to MASTER_CONNECTION_NAME
+change master 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' to master_host='dummy';
+ERROR HY000: Incorrect arguments to MASTER_CONNECTION_NAME
+start slave 'qqq';
+ERROR HY000: There is no master connection 'qqq'
+stop slave 'qqq';
+ERROR HY000: There is no master connection 'qqq'
+slave 'qqq' start;
+ERROR HY000: There is no master connection 'qqq'
+slave 'qqq' stop;
+ERROR HY000: There is no master connection 'qqq'
+flush slave 'qqq';
+ERROR HY000: There is no master connection 'qqq'
+reset slave 'qqq';
+ERROR HY000: There is no master connection 'qqq'
+select master_pos_wait('master-bin.999999',0,2,'qqq');
+master_pos_wait('master-bin.999999',0,2,'qqq')
+NULL
+Warnings:
+Warning 1617 There is no master connection 'qqq'
+select master_pos_wait('master-bin.999999',0,2,'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc');
+master_pos_wait('master-bin.999999',0,2,'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc')
+NULL
+Warnings:
+Warning 1210 Incorrect arguments to MASTER_CONNECTION_NAME
+#
+# checking usage of default_master_connection;
+#
+select @@default_master_connection;
+@@default_master_connection
+
+select @@global.default_master_connection;
+ERROR HY000: Variable 'default_master_connection' is a SESSION variable
+set @@global.default_master_connection='qqq';
+ERROR HY000: Variable 'default_master_connection' is a SESSION variable and can't be used with SET GLOBAL
+set @@default_master_connection='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc';
+ERROR 42000: Variable 'default_master_connection' can't be set to the value of 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc'
+select @@default_master_connection;
+@@default_master_connection
+
+set @@default_master_connection='qqq';
+select @@default_master_connection;
+@@default_master_connection
+qqq
+show variables like "default_master_connection";
+Variable_name Value
+default_master_connection qqq
+show slave status;
+ERROR HY000: There is no master connection 'qqq'
+select master_pos_wait('master-bin.999999',0,2);
+master_pos_wait('master-bin.999999',0,2)
+NULL
+Warnings:
+Warning 1617 There is no master connection 'qqq'
+set @@default_master_connection='';
+select master_pos_wait('master-bin.999999',0,2);
+master_pos_wait('master-bin.999999',0,2)
+-1
+set @@default_master_connection='';
+#
+# checking variables
+#
+show status like "Slave_running";
+Variable_name Value
+Slave_running ON
+set @@default_master_connection='qqq';
+show status like "Slave_running";
+Variable_name Value
+Slave_running OFF
+Warnings:
+Warning 1617 There is no master connection 'qqq'
+set @@default_master_connection='';
+include/rpl_end.inc
diff --git a/mysql-test/suite/multi_source/syntax.test b/mysql-test/suite/multi_source/syntax.test
new file mode 100644
index 00000000000..af793fe6100
--- /dev/null
+++ b/mysql-test/suite/multi_source/syntax.test
@@ -0,0 +1,77 @@
+# Test multi master syntax
+source include/master-slave.inc;
+
+# Check syntax of multi source replication
+
+show slave status;
+show slave '' status;
+show full slave status;
+
+--echo #
+--echo # Check error handling
+--echo #
+
+--error WARN_NO_MASTER_INFO
+show slave 'qqq' status;
+--error WARN_NO_MASTER_INFO
+show slave 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' status;
+--error ER_WRONG_ARGUMENTS
+show slave 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' status;
+--error ER_WRONG_ARGUMENTS
+change master 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc' to master_host='dummy';
+
+--error WARN_NO_MASTER_INFO
+start slave 'qqq';
+--error WARN_NO_MASTER_INFO
+stop slave 'qqq';
+--error WARN_NO_MASTER_INFO
+slave 'qqq' start;
+--error WARN_NO_MASTER_INFO
+slave 'qqq' stop;
+--error WARN_NO_MASTER_INFO
+flush slave 'qqq';
+--error WARN_NO_MASTER_INFO
+reset slave 'qqq';
+
+select master_pos_wait('master-bin.999999',0,2,'qqq');
+select master_pos_wait('master-bin.999999',0,2,'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc');
+
+save_master_pos;
+connection slave;
+sync_with_master 0,'';
+sync_with_master 0 ,'';
+sync_with_master 0, '';
+
+--echo #
+--echo # checking usage of default_master_connection;
+--echo #
+select @@default_master_connection;
+
+--error 1238
+select @@global.default_master_connection;
+--error 1228
+set @@global.default_master_connection='qqq';
+--error 1231
+set @@default_master_connection='aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc';
+select @@default_master_connection;
+set @@default_master_connection='qqq';
+select @@default_master_connection;
+show variables like "default_master_connection";
+
+--error WARN_NO_MASTER_INFO
+show slave status;
+select master_pos_wait('master-bin.999999',0,2);
+set @@default_master_connection='';
+select master_pos_wait('master-bin.999999',0,2);
+
+set @@default_master_connection='';
+
+--echo #
+--echo # checking variables
+--echo #
+show status like "Slave_running";
+set @@default_master_connection='qqq';
+show status like "Slave_running";
+set @@default_master_connection='';
+
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_rotate_logs.result b/mysql-test/suite/rpl/r/rpl_rotate_logs.result
index f10e30c698d..783c02b961c 100644
--- a/mysql-test/suite/rpl/r/rpl_rotate_logs.result
+++ b/mysql-test/suite/rpl/r/rpl_rotate_logs.result
@@ -2,9 +2,9 @@ CALL mtr.add_suppression("Unsafe statement written to the binary log using state
start slave;
Got one of the listed errors
start slave;
-ERROR HY000: Could not initialize master info structure; more error messages can be found in the MariaDB error log
+ERROR HY000: Could not initialize master info structure for ''; more error messages can be found in the MariaDB error log
change master to master_host='127.0.0.1',master_port=MASTER_PORT, master_user='root';
-ERROR HY000: Could not initialize master info structure; more error messages can be found in the MariaDB error log
+ERROR HY000: Could not initialize master info structure for ''; more error messages can be found in the MariaDB error log
reset slave;
change master to master_host='127.0.0.1',master_port=MASTER_PORT, master_user='root';
reset master;
diff --git a/mysql-test/t/parser.test b/mysql-test/t/parser.test
index d477843b22b..2c8cfafb90a 100644
--- a/mysql-test/t/parser.test
+++ b/mysql-test/t/parser.test
@@ -553,7 +553,7 @@ select master_pos_wait();
-- error ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT
select master_pos_wait(1);
-- error ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT
-select master_pos_wait(1, 2, 3, 4);
+select master_pos_wait(1, 2, 3, 4, 5);
-- error ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT
select rand(1, 2, 3);
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index b41c9e2cda0..f107914f738 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -190,7 +190,6 @@ pre_init_event_thread(THD* thd)
my_net_init(&thd->net, NULL);
thd->security_ctx->set_user((char*)"event_scheduler");
thd->net.read_timeout= slave_net_timeout;
- thd->slave_thread= 0;
thd->variables.option_bits|= OPTION_AUTO_IS_NULL;
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
mysql_mutex_lock(&LOCK_thread_count);
diff --git a/sql/item_create.cc b/sql/item_create.cc
index 96837a8f262..87e90ed4f8b 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -4393,27 +4393,36 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name,
if (item_list != NULL)
arg_count= item_list->elements;
+ if (arg_count < 2 || arg_count >4)
+ {
+ my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
+ return func;
+ }
+
+ thd->lex->safe_to_cache_query= 0;
+
+ Item *param_1= item_list->pop();
+ Item *param_2= item_list->pop();
switch (arg_count) {
case 2:
{
- Item *param_1= item_list->pop();
- Item *param_2= item_list->pop();
func= new (thd->mem_root) Item_master_pos_wait(param_1, param_2);
- thd->lex->safe_to_cache_query= 0;
break;
}
case 3:
{
- Item *param_1= item_list->pop();
- Item *param_2= item_list->pop();
Item *param_3= item_list->pop();
func= new (thd->mem_root) Item_master_pos_wait(param_1, param_2, param_3);
thd->lex->safe_to_cache_query= 0;
break;
}
- default:
+ case 4:
{
- my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
+ Item *param_3= item_list->pop();
+ Item *param_4= item_list->pop();
+ func= new (thd->mem_root) Item_master_pos_wait(param_1, param_2, param_3,
+ param_4);
+ thd->lex->safe_to_cache_query= 0;
break;
}
}
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 441eb37d701..5db91cd4231 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -3866,13 +3866,45 @@ longlong Item_master_pos_wait::val_int()
#ifdef HAVE_REPLICATION
longlong pos = (ulong)args[1]->val_int();
longlong timeout = (arg_count==3) ? args[2]->val_int() : 0 ;
- if ((event_count = active_mi->rli.wait_for_pos(thd, log_name, pos, timeout)) == -2)
+ String connection_name_buff;
+ LEX_STRING connection_name;
+ Master_info *mi;
+ if (arg_count == 4)
+ {
+ String *con;
+ if (!(con= args[3]->val_str(&connection_name_buff)))
+ goto err;
+
+ connection_name.str= (char*) con->ptr();
+ connection_name.length= con->length();
+ if (check_master_connection_name(&connection_name))
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(ME_JUST_WARNING),
+ "MASTER_CONNECTION_NAME");
+ goto err;
+ }
+ }
+ else
+ connection_name= thd->variables.default_master_connection;
+
+ if (!(mi= master_info_index->get_master_info(&connection_name,
+ MYSQL_ERROR::WARN_LEVEL_WARN)))
+ goto err;
+ if ((event_count = mi->rli.wait_for_pos(thd, log_name, pos, timeout)) == -2)
{
null_value = 1;
event_count=0;
}
#endif
return event_count;
+
+#ifdef HAVE_REPLICATION
+err:
+ {
+ null_value = 1;
+ return 0;
+ }
+#endif
}
diff --git a/sql/item_func.h b/sql/item_func.h
index 586444e0e4e..07b246b3ccd 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -496,6 +496,8 @@ public:
{ collation.set_numeric(); fix_char_length(21); }
Item_int_func(Item *a,Item *b,Item *c) :Item_func(a,b,c)
{ collation.set_numeric(); fix_char_length(21); }
+ Item_int_func(Item *a,Item *b,Item *c, Item *d) :Item_func(a,b,c,d)
+ { collation.set_numeric(); fix_char_length(21); }
Item_int_func(List<Item> &list) :Item_func(list)
{ collation.set_numeric(); fix_char_length(21); }
Item_int_func(THD *thd, Item_int_func *item) :Item_func(thd, item)
@@ -1522,6 +1524,7 @@ class Item_master_pos_wait :public Item_int_func
public:
Item_master_pos_wait(Item *a,Item *b) :Item_int_func(a,b) {}
Item_master_pos_wait(Item *a,Item *b,Item *c) :Item_int_func(a,b,c) {}
+ Item_master_pos_wait(Item *a,Item *b, Item *c, Item *d) :Item_int_func(a,b,c,d) {}
longlong val_int();
const char *func_name() const { return "master_pos_wait"; }
void fix_length_and_dec() { max_length=21; maybe_null=1;}
diff --git a/sql/log.cc b/sql/log.cc
index e4db7b2eb0f..54340f94679 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -40,6 +40,7 @@
#include "rpl_rli.h"
#include "sql_audit.h"
#include "log_slow.h"
+#include "mysqld.h"
#include <my_dir.h>
#include <stdarg.h>
@@ -6980,16 +6981,33 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer,
time_t skr;
struct tm tm_tmp;
struct tm *start;
+ THD *thd;
+ int tag_length= 0;
+ char tag[NAME_LEN];
DBUG_ENTER("print_buffer_to_file");
DBUG_PRINT("enter",("buffer: %s", buffer));
+ if (mysqld_server_initialized && (thd= current_thd))
+ {
+ if (thd->connection_name.length)
+ {
+ /*
+ Add tag for slaves so that the user can see from which connection
+ the error originates.
+ */
+ tag_length= my_snprintf(tag, sizeof(tag), ER(ER_MASTER_LOG_PREFIX),
+ (int) thd->connection_name.length,
+ thd->connection_name.str);
+ }
+ }
+
mysql_mutex_lock(&LOCK_error_log);
skr= my_time(0);
localtime_r(&skr, &tm_tmp);
start=&tm_tmp;
- fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %.*s\n",
+ fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %.*s%.*s\n",
start->tm_year % 100,
start->tm_mon+1,
start->tm_mday,
@@ -6998,6 +7016,7 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer,
start->tm_sec,
(level == ERROR_LEVEL ? "ERROR" : level == WARNING_LEVEL ?
"Warning" : "Note"),
+ tag_length, tag,
(int) length, buffer);
fflush(stderr);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 96ea789edd7..defd3a71b93 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -673,8 +673,7 @@ pthread_attr_t connection_attrib;
mysql_mutex_t LOCK_server_started;
mysql_cond_t COND_server_started;
-int mysqld_server_started= 0;
-
+int mysqld_server_started=0, mysqld_server_initialized= 0;
File_parser_dummy_hook file_parser_dummy_hook;
/* replication parameters, if master_host is not NULL, we are a slave */
@@ -1761,7 +1760,12 @@ void clean_up(bool print_message)
if (cleanup_done++)
return; /* purecov: inspected */
- close_active_mi();
+#ifdef HAVE_REPLICATION
+ // We must call end_slave() as clean_up may have been called during startup
+ end_slave();
+ if (use_slave_mask)
+ bitmap_free(&slave_error_mask);
+#endif
stop_handle_manager();
release_ddl_log();
@@ -1776,10 +1780,6 @@ void clean_up(bool print_message)
injector::free_instance();
mysql_bin_log.cleanup();
-#ifdef HAVE_REPLICATION
- if (use_slave_mask)
- bitmap_free(&slave_error_mask);
-#endif
my_tz_free();
my_dboptions_cache_free();
#ifndef NO_EMBEDDED_ACCESS_CHECKS
@@ -4626,6 +4626,7 @@ int mysqld_main(int argc, char **argv)
}
#endif
+ mysqld_server_started= mysqld_server_initialized= 0;
orig_argc= argc;
orig_argv= argv;
my_getopt_use_args_separator= TRUE;
@@ -4890,16 +4891,6 @@ int mysqld_main(int argc, char **argv)
opt_skip_slave_start= 1;
binlog_unsafe_map_init();
- /*
- init_slave() must be called after the thread keys are created.
- Some parts of the code (e.g. SHOW STATUS LIKE 'slave_running' and other
- places) assume that active_mi != 0, so let's fail if it's 0 (out of
- memory); a message has already been printed.
- */
- if (init_slave() && !active_mi)
- {
- unireg_abort(1);
- }
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
initialize_performance_schema_acl(opt_bootstrap);
@@ -4917,9 +4908,17 @@ int mysqld_main(int argc, char **argv)
execute_ddl_log_recovery();
+ /*
+ We must have LOCK_open before LOCK_global_system_variables because
+ LOCK_open is hold while sql_plugin.c::intern_sys_var_ptr() is called.
+ */
+ mysql_mutex_record_order(&LOCK_open, &LOCK_global_system_variables);
+
if (Events::init(opt_noacl || opt_bootstrap))
unireg_abort(1);
+ mysqld_server_initialized= 1;
+
if (opt_bootstrap)
{
select_thread_in_use= 0; // Allow 'kill' to work
@@ -4932,21 +4931,27 @@ int mysqld_main(int argc, char **argv)
exit(0);
}
}
+
+ create_shutdown_thread();
+ start_handle_manager();
+
+ /*
+ init_slave() must be called after the thread keys are created.
+ Some parts of the code (e.g. SHOW STATUS LIKE 'slave_running' and other
+ places) assume that active_mi != 0, so let's fail if it's 0 (out of
+ memory); a message has already been printed.
+ */
+ if (init_slave() && !active_mi)
+ {
+ unireg_abort(1);
+ }
+
if (opt_init_file && *opt_init_file)
{
if (read_init_file(opt_init_file))
unireg_abort(1);
}
- /*
- We must have LOCK_open before LOCK_global_system_variables because
- LOCK_open is hold while sql_plugin.c::intern_sys_var_ptr() is called.
- */
- mysql_mutex_record_order(&LOCK_open, &LOCK_global_system_variables);
-
- create_shutdown_thread();
- start_handle_manager();
-
sql_print_information(ER_DEFAULT(ER_STARTUP),my_progname,server_version,
((unix_sock == INVALID_SOCKET) ? (char*) ""
: mysqld_unix_port),
@@ -6524,12 +6529,17 @@ static int show_rpl_status(THD *thd, SHOW_VAR *var, char *buff)
static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff)
{
+ Master_info *mi;
var->type= SHOW_MY_BOOL;
mysql_mutex_lock(&LOCK_active_mi);
var->value= buff;
- *((my_bool *)buff)= (my_bool) (active_mi &&
- active_mi->slave_running == MYSQL_SLAVE_RUN_CONNECT &&
- active_mi->rli.slave_running);
+ mi= master_info_index->
+ get_master_info(&thd->variables.default_master_connection,
+ MYSQL_ERROR::WARN_LEVEL_WARN);
+ *((my_bool *)buff)= (my_bool) (mi &&
+ mi->slave_running ==
+ MYSQL_SLAVE_RUN_CONNECT &&
+ mi->rli.slave_running);
mysql_mutex_unlock(&LOCK_active_mi);
return 0;
}
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 7a24b56dcb7..fd33ca0cf85 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -198,7 +198,7 @@ extern handlerton *myisam_hton;
extern handlerton *heap_hton;
extern const char *load_default_groups[];
extern struct my_option my_long_options[];
-extern int mysqld_server_started;
+extern int mysqld_server_started, mysqld_server_initialized;
extern "C" MYSQL_PLUGIN_IMPORT int orig_argc;
extern "C" MYSQL_PLUGIN_IMPORT char **orig_argv;
extern pthread_attr_t connection_attrib;
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 3c5a99121fa..3dc3e38f419 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -20,6 +20,7 @@
#include "unireg.h" // REQUIRED by other includes
#include "rpl_mi.h"
#include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD
+#include "strfunc.h"
#ifdef HAVE_REPLICATION
@@ -27,7 +28,8 @@
static void init_master_log_pos(Master_info* mi);
-Master_info::Master_info(bool is_slave_recovery)
+Master_info::Master_info(LEX_STRING *connection_name_arg,
+ bool is_slave_recovery)
:Slave_reporting_capability("I/O"),
ssl(0), ssl_verify_server_cert(1), fd(-1), io_thd(0),
rli(is_slave_recovery), port(MYSQL_PORT),
@@ -40,6 +42,21 @@ Master_info::Master_info(bool is_slave_recovery)
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
+ /* Store connection name and lower case connection name */
+ connection_name.length= cmp_connection_name.length=
+ connection_name_arg->length;
+ if ((connection_name.str= (char*) my_malloc(connection_name_arg->length*2+2,
+ MYF(MY_WME))))
+ {
+ cmp_connection_name.str= (connection_name.str +
+ connection_name_arg->length+1);
+ strmake(connection_name.str, connection_name_arg->str,
+ connection_name.length);
+ memcpy(cmp_connection_name.str, connection_name_arg->str,
+ connection_name.length+1);
+ my_casedn_str(system_charset_info, cmp_connection_name.str);
+ }
+
my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16);
bzero((char*) &file, sizeof(file));
mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
@@ -55,6 +72,7 @@ Master_info::Master_info(bool is_slave_recovery)
Master_info::~Master_info()
{
+ my_free(connection_name.str);
delete_dynamic(&ignore_server_ids);
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
@@ -407,7 +425,7 @@ file '%s')", fname);
mi->master_log_name,
(ulong) mi->master_log_pos));
- mi->rli.mi = mi;
+ mi->rli.mi= mi;
if (init_relay_log_info(&mi->rli, slave_info_fname))
goto err;
@@ -560,5 +578,455 @@ void end_master_info(Master_info* mi)
DBUG_VOID_RETURN;
}
+/* Multi-Master By P.Linux */
+uchar *get_key_master_info(Master_info *mi, size_t *length,
+ my_bool not_used __attribute__((unused)))
+{
+ /* Return lower case name */
+ *length= mi->cmp_connection_name.length;
+ return (uchar*) mi->cmp_connection_name.str;
+}
+
+void free_key_master_info(Master_info *mi)
+{
+ DBUG_ENTER("free_key_master_info");
+ terminate_slave_threads(mi,SLAVE_FORCE_ALL);
+ end_master_info(mi);
+ delete mi;
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Check if connection name for master_info is valid.
+
+ It's valid if it's a valid system name, is less than
+ MAX_CONNECTION_NAME.
+
+ @return
+ 0 ok
+ 1 error
+*/
+
+bool check_master_connection_name(LEX_STRING *name)
+{
+ if (name->length >= MAX_CONNECTION_NAME)
+ return 1;
+ return 0;
+}
+
+
+/**
+ Create a log file with a signed suffix.
+
+ @param
+ res_file_name Store result here
+ length Length of res_file_name buffer
+ info_file Original file name (prefix)
+ separator Separator character
+ suffix Suffix
+
+ @note
+ If suffix is an empty string, then we don't add any suffix.
+ This is to allow one to use this function also to generate old
+ file names without a prefix.
+*/
+
+void create_signed_file_name(char *res_file_name, uint length,
+ const char *info_file,
+ char separator, LEX_STRING *suffix)
+{
+ char buff[MAX_CONNECTION_NAME+1], res[MAX_CONNECTION_NAME+1], *p;
+ p= strmake(res_file_name, info_file, length);
+ if (suffix->length != 0 && p != info_file + length)
+ {
+ uint errors;
+ size_t res_length;
+
+ *p++= separator;
+ /* Create null terminated string */
+ strmake(buff, suffix->str, suffix->length);
+ /* Convert to lower case */
+ my_casedn_str(system_charset_info, buff);
+ /* Convert to characters usable in a file name */
+ res_length= strconvert(system_charset_info, buff,
+ &my_charset_filename, res, sizeof(res), &errors);
+ strmake(p, res, min(length - (p - res_file_name), res_length));
+ }
+}
+
+
+Master_info_index::Master_info_index()
+{
+ index_file_name[0] = 0;
+ bzero((char*) &index_file, sizeof(index_file));
+}
+
+Master_info_index::~Master_info_index()
+{
+ /* This will close connection for all objects in the cache */
+ my_hash_free(&master_info_hash);
+ end_io_cache(&index_file);
+ if (index_file.file > 0)
+ my_close(index_file.file, MYF(MY_WME));
+}
+
+
+/* Load All Master_info from master.info.index File
+ * RETURN:
+ * 0 - All Success
+ * 1 - All Fail
+ * 2 - Some Success, Some Fail
+ */
+
+bool Master_info_index::init_all_master_info()
+{
+ int thread_mask;
+ int err_num= 0, succ_num= 0; // The number of success read Master_info
+ char sign[MAX_CONNECTION_NAME];
+ File index_file_nr;
+ size_t filename_length, dir_length;
+ DBUG_ENTER("init_all_master_info");
+
+ /*
+ Create the Master_info index file by prepending 'multi-' before
+ the master_info_file file name.
+ */
+ fn_format(index_file_name, master_info_file, mysql_data_home,
+ "", MY_UNPACK_FILENAME);
+ filename_length= strlen(index_file_name) + 1; /* Count 0 byte */
+ dir_length= dirname_length(index_file_name);
+ bmove_upp((uchar*) index_file_name + filename_length + 6,
+ (uchar*) index_file_name + filename_length,
+ filename_length - dir_length);
+ memcpy(index_file_name + dir_length, "multi-", 6);
+
+ if ((index_file_nr= my_open(index_file_name,
+ O_RDWR | O_CREAT | O_BINARY ,
+ MYF(MY_WME | ME_NOREFRESH))) < 0 ||
+ my_sync(index_file_nr, MYF(MY_WME)) ||
+ init_io_cache(&index_file, index_file_nr,
+ IO_SIZE, READ_CACHE,
+ my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
+ 0, MYF(MY_WME | MY_WAIT_IF_FULL)))
+ {
+ if (index_file_nr >= 0)
+ my_close(index_file_nr,MYF(0));
+
+ sql_print_error("Creation of Master_info index file '%s' failed",
+ index_file_name);
+ DBUG_RETURN(1);
+ }
+
+ /* Initialize Master_info Hash Table */
+ if (my_hash_init(&master_info_hash, system_charset_info,
+ MAX_REPLICATION_THREAD, 0, 0,
+ (my_hash_get_key) get_key_master_info,
+ (my_hash_free_key)free_key_master_info, HASH_UNIQUE))
+ {
+ sql_print_error("Initializing Master_info hash table failed");
+ DBUG_RETURN(1);
+ }
+
+ reinit_io_cache(&index_file, READ_CACHE, 0L,0,0);
+ while (!init_strvar_from_file(sign, sizeof(sign),
+ &index_file, NULL))
+ {
+ LEX_STRING connection_name;
+ Master_info *mi;
+ char buf_master_info_file[FN_REFLEN];
+ char buf_relay_log_info_file[FN_REFLEN];
+
+ connection_name.str= sign;
+ connection_name.length= strlen(sign);
+ if (!(mi= new Master_info(&connection_name, relay_log_recovery)) ||
+ mi->error())
+ {
+ delete mi;
+ DBUG_RETURN(1);
+ }
+
+ lock_slave_threads(mi);
+ init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
+
+ create_signed_file_name(buf_master_info_file, sizeof(buf_master_info_file),
+ master_info_file, '.', &connection_name);
+ create_signed_file_name(buf_relay_log_info_file,
+ sizeof(buf_relay_log_info_file),
+ relay_log_info_file, '.', &connection_name);
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Reading Master_info: '%s' Relay_info:'%s'",
+ buf_master_info_file, buf_relay_log_info_file);
+
+ if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file,
+ 0, thread_mask))
+ {
+ err_num++;
+ sql_print_error("Initialized Master_info from '%s' failed",
+ buf_master_info_file);
+ if (!master_info_index->get_master_info(&connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE))
+ {
+ /* Master_info is not in HASH; Add it */
+ if (master_info_index->add_master_info(mi, FALSE))
+ return 1;
+ succ_num++;
+ unlock_slave_threads(mi);
+ }
+ else
+ {
+ /* Master_info already in HASH */
+ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
+ (int) connection_name.length, connection_name.str);
+ unlock_slave_threads(mi);
+ delete mi;
+ }
+ continue;
+ }
+ else
+ {
+ /* Initialization of Master_info succeded. Add it to HASH */
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Initialized Master_info from '%s'",
+ buf_master_info_file);
+ if (master_info_index->get_master_info(&connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE))
+ {
+ /* Master_info was already registered */
+ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
+ (int) connection_name.length, connection_name.str);
+ unlock_slave_threads(mi);
+ delete mi;
+ continue;
+ }
+
+ /* Master_info was not registered; add it */
+ if (master_info_index->add_master_info(mi, FALSE))
+ return 1;
+ succ_num++;
+ unlock_slave_threads(mi);
+
+ if (!opt_skip_slave_start)
+ {
+ if (start_slave_threads(1 /* need mutex */,
+ 0 /* no wait for start*/,
+ mi,
+ buf_master_info_file,
+ buf_relay_log_info_file,
+ SLAVE_IO | SLAVE_SQL))
+ {
+ sql_print_error("Failed to create slave threads for connection %.*s",
+ (int) connection_name.length,
+ connection_name.str);
+ continue;
+ }
+ if (global_system_variables.log_warnings)
+ sql_print_information("Started replication for '%.*s'",
+ (int) connection_name.length,
+ connection_name.str);
+ }
+ }
+ }
+
+ if (!err_num) // No Error on read Master_info
+ {
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Reading of all Master_info entries succeded");
+ DBUG_RETURN(0);
+ }
+ else if (succ_num) // Have some Error and some Success
+ {
+ sql_print_warning("Reading of some Master_info entries failed");
+ DBUG_RETURN(2);
+ }
+ else // All failed
+ {
+ sql_print_error("Reading of all Master_info entries failed!");
+ DBUG_RETURN(1);
+ }
+}
+
+
+/* Write new master.info to master.info.index File */
+bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name,
+ bool do_sync)
+{
+ DBUG_ASSERT(my_b_inited(&index_file) != 0);
+ DBUG_ENTER("write_master_name_to_index_file");
+
+ /* Don't write default slave to master_info.index */
+ if (name->length == 0)
+ DBUG_RETURN(0);
+
+ reinit_io_cache(&index_file, WRITE_CACHE,
+ my_b_filelength(&index_file), 0, 0);
+
+ if (my_b_write(&index_file, (uchar*) name->str, name->length) ||
+ my_b_write(&index_file, (uchar*) "\n", 1) ||
+ flush_io_cache(&index_file) ||
+ (do_sync && my_sync(index_file.file, MYF(MY_WME))))
+ {
+ sql_print_error("Write of new Master_info for '%.*s' to index file failed",
+ (int) name->length, name->str);
+ DBUG_RETURN(1);
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+/**
+ Get Master_info for a connection
+
+ @param
+ connection_name Connection name
+ warning WARN_LEVEL_NOTE -> Don't print anything
+ WARN_LEVEL_WARN -> Issue warning if not exists
+ WARN_LEVEL_ERROR-> Issue error if not exists
+*/
+
+Master_info *
+Master_info_index::get_master_info(LEX_STRING *connection_name,
+ MYSQL_ERROR::enum_warning_level warning)
+{
+ Master_info *mi;
+ char buff[MAX_CONNECTION_NAME+1], *res;
+ uint buff_length;
+
+ /* Make name lower case for comparison */
+ res= strmake(buff, connection_name->str, connection_name->length);
+ my_casedn_str(system_charset_info, buff);
+ buff_length= (size_t) (res-buff);
+
+ mi= (Master_info*) my_hash_search(&master_info_hash,
+ (uchar*) buff, buff_length);
+ if (!mi && warning != MYSQL_ERROR::WARN_LEVEL_NOTE)
+ {
+ my_error(WARN_NO_MASTER_INFO,
+ MYF(warning == MYSQL_ERROR::WARN_LEVEL_WARN ? ME_JUST_WARNING :
+ 0),
+ (int) connection_name->length,
+ connection_name->str);
+ }
+ return mi;
+}
+
+
+/* Check Master_host & Master_port is duplicated or not */
+bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg,
+ const char *host,
+ uint port)
+{
+ Master_info *mi;
+
+ /* Get full host and port name */
+ if ((mi= master_info_index->get_master_info(name_arg,
+ MYSQL_ERROR::WARN_LEVEL_NOTE)))
+ {
+ if (!host)
+ host= mi->host;
+ if (!port)
+ port= mi->port;
+ }
+ if (!host || !port)
+ return FALSE; // Not comparable yet
+
+ for (uint i= 0; i < master_info_hash.records; ++i)
+ {
+ Master_info *tmp_mi;
+ tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
+ if (tmp_mi == mi)
+ continue; // Current connection
+ if (!strcasecmp(host, tmp_mi->host) && port == tmp_mi->port)
+ {
+ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
+ (int) tmp_mi->connection_name.length,
+ tmp_mi->connection_name.str);
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
+
+
+/* Add a Master_info class to Hash Table */
+bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file)
+{
+ if (!my_hash_insert(&master_info_hash, (uchar*) mi))
+ {
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Added new Master_info '%.*s' to hash table",
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ if (write_to_file)
+ return write_master_name_to_index_file(&mi->connection_name, 1);
+ return FALSE;
+ }
+
+ /* Impossible error (EOM) ? */
+ sql_print_error("Adding new entry '%.*s' to master_info failed",
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ return TRUE;
+}
+
+
+/**
+ Remove a Master_info class From Hash Table
+
+ TODO: Change this to use my_rename() to make the file name creation
+ atomic
+*/
+
+bool Master_info_index::remove_master_info(LEX_STRING *name)
+{
+ Master_info* mi;
+ DBUG_ENTER("remove_master_info");
+
+ if ((mi= get_master_info(name, MYSQL_ERROR::WARN_LEVEL_WARN)))
+ {
+ // Delete Master_info and rewrite others to file
+ if (!my_hash_delete(&master_info_hash, (uchar*) mi))
+ {
+ File index_file_nr;
+
+ // Close IO_CACHE and FILE handler fisrt
+ end_io_cache(&index_file);
+ my_close(index_file.file, MYF(MY_WME));
+
+ // Reopen File and truncate it
+ fn_format(index_file_name, master_info_file, mysql_data_home,
+ ".index", MY_UNPACK_FILENAME | MY_APPEND_EXT);
+
+ if ((index_file_nr= my_open(index_file_name,
+ O_RDWR | O_CREAT | O_TRUNC | O_BINARY ,
+ MYF(MY_WME))) < 0 ||
+ init_io_cache(&index_file, index_file_nr,
+ IO_SIZE, WRITE_CACHE,
+ my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
+ 0, MYF(MY_WME | MY_WAIT_IF_FULL)))
+ {
+ int error= my_errno;
+ if (index_file_nr >= 0)
+ my_close(index_file_nr,MYF(0));
+
+ sql_print_error("Create of Master Info Index file '%s' failed with "
+ "error: %M",
+ index_file_name, error);
+ DBUG_RETURN(TRUE);
+ }
+
+ // Rewrite Master_info.index
+ uint i;
+ for (i= 0; i< master_info_hash.records; ++i)
+ {
+ Master_info *tmp_mi;
+ tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
+ write_master_name_to_index_file(&tmp_mi->connection_name, 0);
+ }
+ my_sync(index_file_nr, MYF(MY_WME));
+ }
+ }
+ DBUG_RETURN(FALSE);
+}
#endif /* HAVE_REPLICATION */
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index a885576ef1c..6892ecdb9d1 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -59,16 +59,23 @@ typedef struct st_mysql MYSQL;
class Master_info : public Slave_reporting_capability
{
public:
- Master_info(bool is_slave_recovery);
+ Master_info(LEX_STRING *connection_name, bool is_slave_recovery);
~Master_info();
bool shall_ignore_server_id(ulong s_id);
void clear_in_memory_info(bool all);
+ bool error()
+ {
+ /* If malloc() in initialization failed */
+ return connection_name.str == 0;
+ }
/* the variables below are needed because we can change masters on the fly */
- char master_log_name[FN_REFLEN];
+ char master_log_name[FN_REFLEN+6]; /* Place for multi-*/
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[MAX_PASSWORD_LENGTH+1];
+ LEX_STRING connection_name; /* User supplied connection name */
+ LEX_STRING cmp_connection_name; /* Connection name in lower case */
bool ssl; // enables use of SSL connection if true
char ssl_ca[FN_REFLEN], ssl_capath[FN_REFLEN], ssl_cert[FN_REFLEN];
char ssl_cipher[FN_REFLEN], ssl_key[FN_REFLEN];
@@ -130,5 +137,45 @@ int flush_master_info(Master_info* mi,
bool need_lock_relay_log);
int change_master_server_id_cmp(ulong *id1, ulong *id2);
+/*
+ Multi master are handled trough this struct.
+ Changes to this needs to be protected by LOCK_active_mi;
+*/
+
+class Master_info_index
+{
+private:
+ IO_CACHE index_file;
+ char index_file_name[FN_REFLEN];
+
+public:
+ Master_info_index();
+ ~Master_info_index();
+
+ HASH master_info_hash;
+
+ bool init_all_master_info();
+ bool write_master_name_to_index_file(LEX_STRING *connection_name,
+ bool do_sync);
+
+ bool check_duplicate_master_info(LEX_STRING *connection_name,
+ const char *host, uint port);
+ bool add_master_info(Master_info *mi, bool write_to_file);
+ bool remove_master_info(LEX_STRING *connection_name);
+ Master_info *get_master_info(LEX_STRING *connection_name,
+ MYSQL_ERROR::enum_warning_level warning);
+};
+
+bool check_master_connection_name(LEX_STRING *name);
+void create_signed_file_name(char *res_file_name, uint length,
+ const char *info_file,
+ char separator,
+ LEX_STRING *suffix);
+
+uchar *get_key_master_info(Master_info *mi, size_t *length,
+ my_bool not_used __attribute__((unused)));
+void free_key_master_info(Master_info *mi);
+
+
#endif /* HAVE_REPLICATION */
#endif /* RPL_MI_H */
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index a700aff0da9..442a9895f6b 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -206,19 +206,37 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
name_warning_sent= 1;
}
+ /* For multimaster, add connection name to relay log filenames */
+ Master_info* mi= rli->mi;
+ char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
+ char *buf_relaylog_index_name= opt_relaylog_index_name;
+
+ create_signed_file_name(buf_relay_logname, sizeof(buf_relay_logname),
+ ln, '-', &mi->connection_name);
+ ln= buf_relay_logname;
+
+ if (opt_relaylog_index_name)
+ {
+ buf_relaylog_index_name= buf_relaylog_index_name_buff;
+ create_signed_file_name(buf_relaylog_index_name_buff,
+ sizeof(buf_relaylog_index_name_buff),
+ opt_relaylog_index_name, '-',
+ &mi->connection_name);
+ }
+
rli->relay_log.is_relay_log= TRUE;
/*
note, that if open() fails, we'll still have index file open
but a destructor will take care of that
*/
- if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE) ||
+ if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND,
(max_relay_log_size ? max_relay_log_size :
max_binlog_size), 1, TRUE))
{
mysql_mutex_unlock(&rli->data_lock);
- sql_print_error("Failed in open_log() called from init_relay_log_info()");
+ sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno);
DBUG_RETURN(1);
}
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index b989283deb4..eb808351a34 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -25,7 +25,6 @@
struct RPL_TABLE_LIST;
class Master_info;
-extern uint sql_slave_skip_counter;
/****************************************************************************
@@ -230,7 +229,7 @@ public:
skipping one or more events in the master log that have caused
errors, and have been manually applied by DBA already.
*/
- volatile uint32 slave_skip_counter;
+ volatile uint slave_skip_counter; /* Must be uint */
volatile ulong abort_pos_wait; /* Incremented on change master */
volatile ulong slave_run_id; /* Incremented on slave start */
mysql_mutex_t log_space_lock;
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 2e8b2231948..bb5506756e1 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -4310,11 +4310,11 @@ ER_BAD_SLAVE
swe "Servern är inte konfigurerade som en replikationsslav. Ändra konfigurationsfilen eller gör CHANGE MASTER TO"
ukr "Сервер не зконфігуровано як підлеглий, виправте це у файлі конфігурації або з CHANGE MASTER TO"
ER_MASTER_INFO
- eng "Could not initialize master info structure; more error messages can be found in the MariaDB error log"
- fre "Impossible d'initialiser les structures d'information de maître, vous trouverez des messages d'erreur supplémentaires dans le journal des erreurs de MariaDB"
- ger "Konnte Master-Info-Struktur nicht initialisieren. Weitere Fehlermeldungen können im MariaDB-Error-Log eingesehen werden"
- serbian "Nisam mogao da inicijalizujem informacionu strukturu glavnog servera, proverite da li imam privilegije potrebne za pristup file-u 'master.info'"
- swe "Kunde inte initialisera replikationsstrukturerna. See MariaDB fel fil för mera information"
+ eng "Could not initialize master info structure for '%.*s'; more error messages can be found in the MariaDB error log"
+ fre "Impossible d'initialiser les structures d'information de maître '%.*s', vous trouverez des messages d'erreur supplémentaires dans le journal des erreurs de MariaDB"
+ ger "Konnte Master-Info-Struktur '%.*s' nicht initialisieren. Weitere Fehlermeldungen können im MariaDB-Error-Log eingesehen werden"
+ serbian "Nisam mogao da inicijalizujem informacionu strukturu glavnog servera, proverite da li imam privilegije potrebne za pristup file-u 'master.info' '%.*s'"
+ swe "Kunde inte initialisera replikationsstrukturerna för '%.*s'. See MariaDB fel fil för mera information"
ER_SLAVE_THREAD
dan "Kunne ikke danne en slave-tråd; check systemressourcerne"
nla "Kon slave thread niet aanmaken, controleer systeem resources"
@@ -6156,8 +6156,8 @@ ER_DELAYED_NOT_SUPPORTED
eng "DELAYED option not supported for table '%-.192s'"
ger "Die DELAYED-Option wird für Tabelle '%-.192s' nicht unterstützt"
WARN_NO_MASTER_INFO
- eng "The master info structure does not exist"
- ger "Die Master-Info-Struktur existiert nicht"
+ eng "There is no master connection '%.*s'"
+ ger "Die Master-Info-Struktur existiert nicht '%.*s'"
WARN_OPTION_IGNORED
eng "<%-.64s> option ignored"
ger "Option <%-.64s> ignoriert"
@@ -6588,3 +6588,8 @@ ER_QUERY_EXCEEDED_ROWS_EXAMINED_LIMIT
ER_NO_SUCH_TABLE_IN_ENGINE 42S02
eng "Table '%-.192s.%-.192s' doesn't exist in engine"
swe "Det finns ingen tabell som heter '%-.192s.%-.192s' i handlern"
+ER_CONNECTION_ALREADY_EXISTS
+ eng "Connection '%.*s' already exists"
+ER_MASTER_LOG_PREFIX
+ eng "Master '%.*s': "
+
diff --git a/sql/slave.cc b/sql/slave.cc
index 7b7bddecd17..8254e90b369 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -71,8 +71,10 @@ char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
char* slave_load_tmpdir = 0;
Master_info *active_mi= 0;
+Master_info_index *master_info_index;
my_bool replicate_same_server_id;
ulonglong relay_log_space_limit = 0;
+LEX_STRING default_master_connection_name= { (char*) "", 0 };
/*
When slave thread exits, we need to remember the temporary tables so we
@@ -144,7 +146,8 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(Relay_log_info* rli);
static inline bool io_slave_killed(THD* thd,Master_info* mi);
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
-static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
+static int init_slave_thread(THD* thd, Master_info *mi,
+ SLAVE_THD_TYPE thd_type);
static void print_slave_skip_errors(void);
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
@@ -159,6 +162,8 @@ static int terminate_slave_thread(THD *thd,
volatile uint *slave_running,
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
+static bool send_show_master_info_header(THD *thd, Master_info *mi, bool full);
+static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full);
/*
Find out which replications threads are running
@@ -263,15 +268,33 @@ int init_slave()
So it's safer to take the lock.
*/
mysql_mutex_lock(&LOCK_active_mi);
- /*
- TODO: re-write this to interate through the list of files
- for multi-master
- */
- active_mi= new Master_info(relay_log_recovery);
if (pthread_key_create(&RPL_MASTER_INFO, NULL))
goto err;
+ master_info_index= new Master_info_index;
+ if (!master_info_index || master_info_index->init_all_master_info())
+ {
+ sql_print_error("Failed to initialize multi master structures");
+ mysql_mutex_unlock(&LOCK_active_mi);
+ DBUG_RETURN(1);
+ }
+ if (!(active_mi= new Master_info(&default_master_connection_name,
+ relay_log_recovery)) ||
+ active_mi->error())
+ {
+ delete active_mi;
+ active_mi= 0;
+ goto err;
+ }
+
+ if (master_info_index->add_master_info(active_mi, FALSE))
+ {
+ delete active_mi;
+ active_mi= 0;
+ goto err;
+ }
+
/*
If --slave-skip-errors=... was not used, the string value for the
system variable has not been set up yet. Do it now.
@@ -286,18 +309,11 @@ int init_slave()
If master_host is specified, create the master_info file if it doesn't
exists.
*/
- if (!active_mi)
- {
- sql_print_error("Failed to allocate memory for the master info structure");
- error= 1;
- goto err;
- }
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
1, (SLAVE_IO | SLAVE_SQL)))
{
sql_print_error("Failed to initialize the master info structure");
- error= 1;
goto err;
}
@@ -313,14 +329,18 @@ int init_slave()
SLAVE_IO | SLAVE_SQL))
{
sql_print_error("Failed to create slave threads");
- error= 1;
goto err;
}
}
-err:
+end:
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(error);
+
+err:
+ sql_print_error("Failed to allocate memory for the Master Info structure");
+ error= 1;
+ goto end;
}
/*
@@ -820,42 +840,19 @@ void end_slave()
running presently. If a START SLAVE was in progress, the mutex lock below
will make us wait until slave threads have started, and START SLAVE
returns, then we terminate them here.
+
+ We can also be called by cleanup(), which only happens if some
+ startup parameter to the server was wrong.
*/
mysql_mutex_lock(&LOCK_active_mi);
- if (active_mi)
- {
- /*
- TODO: replace the line below with
- list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
- once multi-master code is ready.
- */
- terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
- }
+ /* This will call terminate_slave_threads() on all connections */
+ delete master_info_index;
+ master_info_index= 0;
+ active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_VOID_RETURN;
}
-/**
- Free all resources used by slave threads at time of executing shutdown.
- The routine must be called after all possible users of @c active_mi
- have left.
-
- SYNOPSIS
- close_active_mi()
-
-*/
-void close_active_mi()
-{
- mysql_mutex_lock(&LOCK_active_mi);
- if (active_mi)
- {
- end_master_info(active_mi);
- delete active_mi;
- active_mi= 0;
- }
- mysql_mutex_unlock(&LOCK_active_mi);
-}
-
static bool io_slave_killed(THD* thd, Master_info* mi)
{
DBUG_ENTER("io_slave_killed");
@@ -2022,12 +2019,28 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
@retval FALSE success
@retval TRUE failure
*/
-bool show_master_info(THD* thd, Master_info* mi)
+
+bool show_master_info(THD *thd, Master_info *mi, bool full)
+{
+ DBUG_ENTER("show_master_info");
+
+ if (send_show_master_info_header(thd, mi, full))
+ DBUG_RETURN(TRUE);
+ if (send_show_master_info_data(thd, mi, full))
+ DBUG_RETURN(TRUE);
+ my_eof(thd);
+ DBUG_RETURN(FALSE);
+}
+
+static bool send_show_master_info_header(THD *thd, Master_info *mi, bool full)
{
- // TODO: fix this for multi-master
List<Item> field_list;
Protocol *protocol= thd->protocol;
- DBUG_ENTER("show_master_info");
+ DBUG_ENTER("show_master_info_header");
+
+ if (full)
+ field_list.push_back(new Item_empty_string("Connection_name",
+ MAX_CONNECTION_NAME));
field_list.push_back(new Item_empty_string("Slave_IO_State",
14));
@@ -2097,17 +2110,29 @@ bool show_master_info(THD* thd, Master_info* mi)
if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
+ DBUG_RETURN(FALSE);
+}
+
+
+static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full)
+{
+ DBUG_ENTER("send_show_master_info_data");
if (mi->host[0])
{
DBUG_PRINT("info",("host is set: '%s'", mi->host));
String *packet= &thd->packet;
+ Protocol *protocol= thd->protocol;
+
protocol->prepare_for_resend();
/*
slave_running can be accessed without run_lock but not other
non-volotile members like mi->io_thd, which is guarded by the mutex.
*/
+ if (full)
+ protocol->store(mi->connection_name.str, mi->connection_name.length,
+ &my_charset_bin);
mysql_mutex_lock(&mi->run_lock);
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
mysql_mutex_unlock(&mi->run_lock);
@@ -2250,6 +2275,65 @@ bool show_master_info(THD* thd, Master_info* mi)
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
DBUG_RETURN(TRUE);
}
+ DBUG_RETURN(FALSE);
+}
+
+
+/* Used to sort connections by name */
+
+static int cmp_mi_by_name(const Master_info **arg1,
+ const Master_info **arg2)
+{
+ return my_strcasecmp(system_charset_info, (*arg1)->connection_name.str,
+ (*arg2)->connection_name.str);
+}
+
+
+/**
+ Execute a SHOW FULL SLAVE STATUS statement.
+
+ @param thd Pointer to THD object for the client thread executing the
+ statement.
+
+ Elements are sorted according to the original connection_name.
+
+ @retval FALSE success
+ @retval TRUE failure
+*/
+
+bool show_all_master_info(THD* thd)
+{
+ uint i, elements;
+ Master_info **tmp;
+ DBUG_ENTER("show_master_info");
+
+ if (send_show_master_info_header(thd, active_mi, 1))
+ DBUG_RETURN(TRUE);
+
+ if (!(elements= master_info_index->master_info_hash.records))
+ goto end;
+
+ /*
+ Sort lines to get them into a predicted order
+ (needed for test cases and to not confuse users)
+ */
+ if (!(tmp= (Master_info**) thd->alloc(sizeof(Master_info*) * elements)))
+ DBUG_RETURN(TRUE);
+
+ for (i= 0; i < elements; i++)
+ {
+ tmp[i]= (Master_info *) my_hash_element(&master_info_index->
+ master_info_hash, i);
+ }
+ my_qsort(tmp, elements, sizeof(Master_info*), (qsort_cmp) cmp_mi_by_name);
+
+ for (i= 0; i < elements; i++)
+ {
+ if (send_show_master_info_data(thd, tmp[i], 1))
+ DBUG_RETURN(TRUE);
+ }
+
+end:
my_eof(thd);
DBUG_RETURN(FALSE);
}
@@ -2303,7 +2387,8 @@ void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
init_slave_thread()
*/
-static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
+static int init_slave_thread(THD* thd, Master_info *mi,
+ SLAVE_THD_TYPE thd_type)
{
DBUG_ENTER("init_slave_thread");
#if !defined(DBUG_OFF)
@@ -2319,7 +2404,8 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
than the corresponding packet (query) sent from client to master.
*/
thd->variables.max_allowed_packet= slave_max_allowed_packet;
- thd->slave_thread = 1;
+ thd->slave_thread= 1;
+ thd->connection_name= mi->connection_name;
thd->enable_slow_log= opt_log_slow_slave_statements;
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
set_slave_thread_options(thd);
@@ -2635,7 +2721,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
int reason= ev->shall_skip(rli);
if (reason == Log_event::EVENT_SKIP_COUNT)
- sql_slave_skip_counter= --rli->slave_skip_counter;
+ {
+ DBUG_ASSERT(rli->slave_skip_counter > 0);
+ rli->slave_skip_counter--;
+ }
mysql_mutex_unlock(&rli->data_lock);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
@@ -3058,7 +3147,7 @@ pthread_handler_t handle_slave_io(void *arg)
pthread_detach_this_thread();
thd->thread_stack= (char*) &thd; // remember where our stack is
mi->clear_error();
- if (init_slave_thread(thd, SLAVE_THD_IO))
+ if (init_slave_thread(thd, mi, SLAVE_THD_IO))
{
mysql_cond_broadcast(&mi->start_cond);
sql_print_error("Failed during slave I/O thread initialization");
@@ -3457,7 +3546,8 @@ pthread_handler_t handle_slave_sql(void *arg)
my_off_t UNINIT_VAR(saved_log_pos);
my_off_t UNINIT_VAR(saved_master_log_pos);
my_off_t saved_skip= 0;
- Relay_log_info* rli = &((Master_info*)arg)->rli;
+ Master_info *mi= ((Master_info*)arg);
+ Relay_log_info* rli = &mi->rli;
const char *errmsg;
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
@@ -3471,6 +3561,7 @@ pthread_handler_t handle_slave_sql(void *arg)
thd->thread_stack = (char*)&thd; // remember where our stack is
DBUG_ASSERT(rli->inited);
+ DBUG_ASSERT(rli->mi == mi);
mysql_mutex_lock(&rli->run_lock);
DBUG_ASSERT(!rli->slave_running);
errmsg= 0;
@@ -3485,7 +3576,7 @@ pthread_handler_t handle_slave_sql(void *arg)
rli->slave_running = 1;
pthread_detach_this_thread();
- if (init_slave_thread(thd, SLAVE_THD_SQL))
+ if (init_slave_thread(thd, mi, SLAVE_THD_SQL))
{
/*
TODO: this is currently broken - slave start and change master
diff --git a/sql/slave.h b/sql/slave.h
index 6b4bcffe109..565f40b7236 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -45,9 +45,12 @@
#define MAX_SLAVE_ERROR 2000
+#define MAX_REPLICATION_THREAD 64
+
// Forward declarations
class Relay_log_info;
class Master_info;
+class Master_info_index;
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
@@ -197,7 +200,8 @@ int mysql_table_dump(THD* thd, const char* db,
int fetch_master_table(THD* thd, const char* db_name, const char* table_name,
Master_info* mi, MYSQL* mysql, bool overwrite);
-bool show_master_info(THD* thd, Master_info* mi);
+bool show_master_info(THD* thd, Master_info* mi, bool full);
+bool show_all_master_info(THD* thd);
bool show_binlog_info(THD* thd);
bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report,
bool (*pred)(const void *), const void *param);
@@ -231,6 +235,9 @@ bool net_request_file(NET* net, const char* fname);
extern bool volatile abort_loop;
extern Master_info main_mi, *active_mi; /* active_mi for multi-master */
+extern Master_info *default_master_info; /* To replace active_mi */
+extern Master_info_index *master_info_index;
+extern LEX_STRING default_master_connection_name;
extern LIST master_list;
extern my_bool replicate_same_server_id;
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 85e9227c154..c098ba5be7c 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -396,6 +396,7 @@ bool table_def_init(void)
void table_def_start_shutdown(void)
{
+ DBUG_ENTER("table_def_start_shutdown");
if (table_def_inited)
{
mysql_mutex_lock(&LOCK_open);
@@ -410,6 +411,7 @@ void table_def_start_shutdown(void)
/* Free all cached but unused TABLEs and TABLE_SHAREs. */
close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
}
+ DBUG_VOID_RETURN;
}
@@ -1086,6 +1088,9 @@ bool close_cached_tables(THD *thd, TABLE_LIST *tables,
mysql_mutex_unlock(&LOCK_open);
+ DBUG_PRINT("info", ("open table definitions: %d",
+ (int) table_def_cache.records));
+
if (!wait_for_refresh)
DBUG_RETURN(result);
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index f77ef114d2d..b0d262f7b6a 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -799,6 +799,9 @@ THD::THD()
progress.max_counter= 0;
current_linfo = 0;
slave_thread = 0;
+ connection_name.str= 0;
+ connection_name.length= 0;
+
bzero(&variables, sizeof(variables));
thread_id= 0;
one_shot_set= 0;
@@ -1166,7 +1169,14 @@ void THD::init(void)
avoid temporary tables replication failure.
*/
variables.pseudo_thread_id= thread_id;
+
+ variables.default_master_connection.str= default_master_connection_buff;
+ ::strmake(variables.default_master_connection.str,
+ global_system_variables.default_master_connection.str,
+ variables.default_master_connection.length);
+
mysql_mutex_unlock(&LOCK_global_system_variables);
+
server_status= SERVER_STATUS_AUTOCOMMIT;
if (variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES)
server_status|= SERVER_STATUS_NO_BACKSLASH_ESCAPES;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 3aa0c5ae9c7..7f44a4b468a 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -533,6 +533,11 @@ typedef struct system_variables
thread the query is being run to replicate temp tables properly
*/
my_thread_id pseudo_thread_id;
+ /**
+ Place holder to store sql_slave_skip_counter in sys_var.cc during
+ update and show of variables.
+ */
+ uint slave_skip_counter;
my_bool low_priority_updates;
my_bool query_cache_wlock_invalidate;
@@ -557,6 +562,9 @@ typedef struct system_variables
CHARSET_INFO *collation_database;
CHARSET_INFO *collation_connection;
+ /* Names. These will be allocated in buffers in thd */
+ LEX_STRING default_master_connection;
+
/* Error messages */
MY_LOCALE *lc_messages;
/* Locale Support */
@@ -2192,6 +2200,8 @@ public:
/* scramble - random string sent to client on handshake */
char scramble[SCRAMBLE_LENGTH+1];
+ LEX_STRING connection_name; /* If slave */
+ char default_master_connection_buff[MAX_CONNECTION_NAME+1];
bool slave_thread, one_shot_set;
bool extra_port; /* If extra connection */
diff --git a/sql/sql_const.h b/sql/sql_const.h
index cfbf077bd91..255a0162e93 100644
--- a/sql/sql_const.h
+++ b/sql/sql_const.h
@@ -38,6 +38,7 @@
#define MAX_REFLENGTH 4 /* Max length for record ref */
#endif
#define MAX_HOSTNAME 61 /* len+1 in mysql.user */
+#define MAX_CONNECTION_NAME NAME_LEN
#define MAX_MBWIDTH 3 /* Max multibyte sequence */
#define MAX_FIELD_CHARLENGTH 255
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index 240aeb223d4..0b8265e7028 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -503,6 +503,7 @@ void lex_start(THD *thd)
lex->expr_allows_subselect= TRUE;
lex->use_only_table_context= FALSE;
lex->parse_vcol_expr= FALSE;
+ lex->verbose= 0;
lex->name.str= 0;
lex->name.length= 0;
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 2f3214646de..7e92fe12dc3 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -286,6 +286,7 @@ struct LEX_MASTER_INFO
char *host, *user, *password, *log_file_name;
char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher;
char *relay_log_name;
+ LEX_STRING connection_name;
ulonglong pos;
ulong relay_log_pos;
ulong server_id;
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 175c2c1d672..114035bc16d 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -82,6 +82,7 @@
#include <myisam.h>
#include <my_dir.h>
#include "rpl_handler.h"
+#include "rpl_mi.h"
#include "sp_head.h"
#include "sp.h"
@@ -2343,10 +2344,41 @@ case SQLCOM_PREPARE:
#ifdef HAVE_REPLICATION
case SQLCOM_CHANGE_MASTER:
{
+ LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
+ Master_info *mi;
+ bool new_master= 0;
+
if (check_global_access(thd, SUPER_ACL))
goto error;
mysql_mutex_lock(&LOCK_active_mi);
- res = change_master(thd,active_mi);
+
+ mi= master_info_index->get_master_info(&lex_mi->connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE);
+
+ if (mi == NULL)
+ {
+ /* New replication created */
+ mi= new Master_info(&lex_mi->connection_name, relay_log_recovery);
+ if (!mi || mi->error())
+ {
+ delete mi;
+ res= 1;
+ mysql_mutex_unlock(&LOCK_active_mi);
+ break;
+ }
+ new_master= 1;
+ }
+
+ res= change_master(thd, mi);
+ if (res && new_master)
+ {
+ /*
+ The new master was added by change_master(). Remove it as it didn't
+ work.
+ */
+ master_info_index->remove_master_info(&lex_mi->connection_name);
+ }
+
mysql_mutex_unlock(&LOCK_active_mi);
break;
}
@@ -2356,15 +2388,19 @@ case SQLCOM_PREPARE:
if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL))
goto error;
mysql_mutex_lock(&LOCK_active_mi);
- if (active_mi != NULL)
- {
- res = show_master_info(thd, active_mi);
- }
+
+ if (lex->verbose)
+ res= show_all_master_info(thd);
else
{
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
- WARN_NO_MASTER_INFO, ER(WARN_NO_MASTER_INFO));
- my_ok(thd);
+ LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
+ Master_info *mi;
+ mi= master_info_index->get_master_info(&lex_mi->connection_name,
+ MYSQL_ERROR::WARN_LEVEL_ERROR);
+ if (mi != NULL)
+ {
+ res= show_master_info(thd, mi, 0);
+ }
}
mysql_mutex_unlock(&LOCK_active_mi);
break;
@@ -2664,40 +2700,53 @@ end_with_restore_list:
#ifdef HAVE_REPLICATION
case SQLCOM_SLAVE_START:
{
+ LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
+ Master_info *mi;
mysql_mutex_lock(&LOCK_active_mi);
- start_slave(thd,active_mi,1 /* net report*/);
+
+ if ((mi= (master_info_index->
+ get_master_info(&lex_mi->connection_name,
+ MYSQL_ERROR::WARN_LEVEL_ERROR))))
+ start_slave(thd, mi, 1 /* net report*/);
mysql_mutex_unlock(&LOCK_active_mi);
break;
}
case SQLCOM_SLAVE_STOP:
- /*
- If the client thread has locked tables, a deadlock is possible.
- Assume that
- - the client thread does LOCK TABLE t READ.
- - then the master updates t.
- - then the SQL slave thread wants to update t,
- so it waits for the client thread because t is locked by it.
+ {
+ LEX_MASTER_INFO *lex_mi;
+ Master_info *mi;
+ /*
+ If the client thread has locked tables, a deadlock is possible.
+ Assume that
+ - the client thread does LOCK TABLE t READ.
+ - then the master updates t.
+ - then the SQL slave thread wants to update t,
+ so it waits for the client thread because t is locked by it.
- then the client thread does SLAVE STOP.
SLAVE STOP waits for the SQL slave thread to terminate its
update t, which waits for the client thread because t is locked by it.
- To prevent that, refuse SLAVE STOP if the
- client thread has locked tables
- */
- if (thd->locked_tables_mode ||
- thd->in_active_multi_stmt_transaction() || thd->global_read_lock.is_acquired())
- {
- my_message(ER_LOCK_OR_ACTIVE_TRANSACTION,
- ER(ER_LOCK_OR_ACTIVE_TRANSACTION), MYF(0));
- goto error;
- }
- {
+ To prevent that, refuse SLAVE STOP if the
+ client thread has locked tables
+ */
+ if (thd->locked_tables_mode ||
+ thd->in_active_multi_stmt_transaction() ||
+ thd->global_read_lock.is_acquired())
+ {
+ my_message(ER_LOCK_OR_ACTIVE_TRANSACTION,
+ ER(ER_LOCK_OR_ACTIVE_TRANSACTION), MYF(0));
+ goto error;
+ }
+
+ lex_mi= &thd->lex->mi;
mysql_mutex_lock(&LOCK_active_mi);
- stop_slave(thd,active_mi,1/* net report*/);
+ if ((mi= (master_info_index->
+ get_master_info(&lex_mi->connection_name,
+ MYSQL_ERROR::WARN_LEVEL_ERROR))))
+ stop_slave(thd, mi, 1/* net report*/);
mysql_mutex_unlock(&LOCK_active_mi);
break;
}
#endif /* HAVE_REPLICATION */
-
case SQLCOM_RENAME_TABLE:
{
if (execute_rename_table(thd, first_table, all_tables))
diff --git a/sql/sql_reload.cc b/sql/sql_reload.cc
index 914b9026014..fcb7d15f5ba 100644
--- a/sql/sql_reload.cc
+++ b/sql/sql_reload.cc
@@ -26,6 +26,7 @@
#include "sql_repl.h" // reset_master, reset_slave
#include "rpl_mi.h" // Master_info::data_lock
#include "debug_sync.h"
+#include "rpl_mi.h"
static void disable_checkpoints(THD *thd);
@@ -314,13 +315,27 @@ bool reload_acl_and_cache(THD *thd, unsigned long options,
#ifdef HAVE_REPLICATION
if (options & REFRESH_SLAVE)
{
+ LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
+ Master_info *mi;
tmp_write_to_binlog= 0;
mysql_mutex_lock(&LOCK_active_mi);
- if (reset_slave(thd, active_mi))
+
+ if (!(mi= (master_info_index->
+ get_master_info(&lex_mi->connection_name,
+ MYSQL_ERROR::WARN_LEVEL_ERROR))))
+ {
+ result= 1;
+ }
+ else if (reset_slave(thd, mi))
{
/* NOTE: my_error() has been already called by reset_slave(). */
result= 1;
}
+ else if (mi->connection_name.length && thd->lex->reset_slave_info.all)
+ {
+ /* If not default connection and 'all' is used */
+ master_info_index->remove_master_info(&mi->connection_name);
+ }
mysql_mutex_unlock(&LOCK_active_mi);
}
#endif
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 079aab27101..99395738c66 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -34,14 +34,6 @@ my_bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
-/**
- a copy of active_mi->rli->slave_skip_counter, for showing in SHOW VARIABLES,
- INFORMATION_SCHEMA.GLOBAL_VARIABLES and @@sql_slave_skip_counter without
- taking all the mutexes needed to access active_mi->rli->slave_skip_counter
- properly.
-*/
-uint sql_slave_skip_counter;
-
extern TYPELIB binlog_checksum_typelib;
/*
@@ -1310,8 +1302,17 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
{
int slave_errno= 0;
int thread_mask;
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
DBUG_ENTER("start_slave");
+ create_signed_file_name(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, '.', &mi->connection_name);
+ create_signed_file_name(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, '.', &mi->connection_name);
+
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
DBUG_RETURN(1);
lock_slave_threads(mi); // this allows us to cleanly read slave_running
@@ -1327,7 +1328,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
thread_mask&= thd->lex->slave_thd_opt;
if (thread_mask) //some threads are stopped, start them
{
- if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
+ if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0,
thread_mask))
slave_errno=ER_MASTER_INFO;
else if (server_id_supplied && *mi->host)
@@ -1401,10 +1402,11 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
if (!slave_errno)
slave_errno = start_slave_threads(0 /*no mutex */,
- 1 /* wait for start */,
- mi,
- master_info_file,relay_log_info_file,
- thread_mask);
+ 1 /* wait for start */,
+ mi,
+ master_info_file_tmp,
+ relay_log_info_file_tmp,
+ thread_mask);
}
else
slave_errno = ER_BAD_SLAVE;
@@ -1421,7 +1423,9 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
if (slave_errno)
{
if (net_report)
- my_message(slave_errno, ER(slave_errno), MYF(0));
+ my_error(slave_errno, MYF(0),
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
DBUG_RETURN(1);
}
else if (net_report)
@@ -1446,11 +1450,9 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
*/
int stop_slave(THD* thd, Master_info* mi, bool net_report )
{
- DBUG_ENTER("stop_slave");
-
int slave_errno;
- if (!thd)
- thd = current_thd;
+ DBUG_ENTER("stop_slave");
+ DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str));
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
DBUG_RETURN(1);
@@ -1514,6 +1516,8 @@ int reset_slave(THD *thd, Master_info* mi)
int thread_mask= 0, error= 0;
uint sql_errno=ER_UNKNOWN_ERROR;
const char* errmsg= "Unknown error occured while reseting slave";
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
DBUG_ENTER("reset_slave");
lock_slave_threads(mi);
@@ -1549,22 +1553,35 @@ int reset_slave(THD *thd, Master_info* mi)
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
end_master_info(mi);
+
// and delete these two files
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ create_signed_file_name(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, '.', &mi->connection_name);
+ create_signed_file_name(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, '.', &mi->connection_name);
+
+ fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32);
if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) &&
mysql_file_delete(key_file_master_info, fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ else if (global_system_variables.log_warnings > 1)
+ sql_print_information("Deleted Master_info file '%s'.", fname);
+
// delete relay_log_info_file
- fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
+ fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32);
if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) &&
mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ else if (global_system_variables.log_warnings > 1)
+ sql_print_information("Deleted Master_info file '%s'.", fname);
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
@@ -1644,12 +1661,36 @@ bool change_master(THD* thd, Master_info* mi)
char saved_host[HOSTNAME_LENGTH + 1];
uint saved_port;
char saved_log_name[FN_REFLEN];
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
my_off_t saved_log_pos;
+ LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
DBUG_ENTER("change_master");
+ /*
+ We need to check if there is an empty master_host. Otherwise
+ change master succeeds, a master.info file is created containing
+ empty master_host string and when issuing: start slave; an error
+ is thrown stating that the server is not configured as slave.
+ (See BUG#28796).
+ */
+ if (lex_mi->host && !*lex_mi->host)
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
+ DBUG_RETURN(TRUE);
+ }
+ if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name,
+ lex_mi->host,
+ lex_mi->port))
+ {
+ my_error(ER_MASTER_INFO, MYF(0),
+ (int) lex_mi->connection_name.length,
+ lex_mi->connection_name.str);
+ DBUG_RETURN(TRUE);
+ }
+
lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
- LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
if (thread_mask) // We refuse if any slave thread is running
{
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
@@ -1658,24 +1699,40 @@ bool change_master(THD* thd, Master_info* mi)
}
thd_proc_info(thd, "Changing master");
- /*
- We need to check if there is an empty master_host. Otherwise
- change master succeeds, a master.info file is created containing
- empty master_host string and when issuing: start slave; an error
- is thrown stating that the server is not configured as slave.
- (See BUG#28796).
- */
- if(lex_mi->host && !*lex_mi->host)
+
+ create_signed_file_name(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, '.', &mi->connection_name);
+ create_signed_file_name(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, '.', &mi->connection_name);
+
+ /* if new Master_info doesn't exists, add it */
+ if (!master_info_index->get_master_info(&mi->connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE))
{
- my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ if (master_info_index->add_master_info(mi, TRUE))
+ {
+ my_error(ER_MASTER_INFO, MYF(0),
+ (int) lex_mi->connection_name.length,
+ lex_mi->connection_name.str);
+ ret= TRUE;
+ goto err;
+ }
}
- // TODO: see if needs re-write
- if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Master: '%.*s' Master_info_file: '%s' "
+ "Relay_info_file: '%s'",
+ (int) mi->connection_name.length,
+ mi->connection_name.str,
+ master_info_file_tmp, relay_log_info_file_tmp);
+
+ if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0,
thread_mask))
{
- my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
+ my_error(ER_MASTER_INFO, MYF(0),
+ (int) lex_mi->connection_name.length,
+ lex_mi->connection_name.str);
ret= TRUE;
goto err;
}
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 555efaf366d..e1bcb48e541 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -65,6 +65,7 @@
#include <myisammrg.h>
#include "keycaches.h"
#include "set_var.h"
+#include "rpl_mi.h"
/* this is to get the bison compilation windows warnings out */
#ifdef _MSC_VER
@@ -1904,7 +1905,7 @@ help:
/* change master */
change:
- CHANGE MASTER_SYM TO_SYM
+ CHANGE MASTER_SYM optional_connection_name TO_SYM
{
Lex->sql_command = SQLCOM_CHANGE_MASTER;
}
@@ -2053,6 +2054,29 @@ master_file_def:
}
;
+optional_connection_name:
+ /* empty */
+ {
+ THD *thd= YYTHD;
+ LEX *lex= thd->lex;
+ lex->mi.connection_name= thd->variables.default_master_connection;
+ }
+ | connection_name;
+ ;
+
+connection_name:
+ TEXT_STRING_sys
+ {
+ Lex->mi.connection_name= $1;
+#ifdef HAVE_REPLICATION
+ if (check_master_connection_name(&$1))
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_CONNECTION_NAME");
+ MYSQL_YYABORT;
+ }
+#endif
+ }
+
/* create a table */
create:
@@ -7074,7 +7098,7 @@ opt_to:
*/
slave:
- START_SYM SLAVE slave_thread_opts
+ START_SYM SLAVE optional_connection_name slave_thread_opts
{
LEX *lex=Lex;
lex->sql_command = SQLCOM_SLAVE_START;
@@ -7083,14 +7107,14 @@ slave:
}
slave_until
{}
- | STOP_SYM SLAVE slave_thread_opts
+ | STOP_SYM SLAVE optional_connection_name slave_thread_opts
{
LEX *lex=Lex;
lex->sql_command = SQLCOM_SLAVE_STOP;
lex->type = 0;
/* If you change this code don't forget to update SLAVE STOP too */
}
- | SLAVE START_SYM slave_thread_opts
+ | SLAVE optional_connection_name START_SYM slave_thread_opts
{
LEX *lex=Lex;
lex->sql_command = SQLCOM_SLAVE_START;
@@ -7098,7 +7122,7 @@ slave:
}
slave_until
{}
- | SLAVE STOP_SYM slave_thread_opts
+ | SLAVE optional_connection_name STOP_SYM slave_thread_opts
{
LEX *lex=Lex;
lex->sql_command = SQLCOM_SLAVE_STOP;
@@ -11558,9 +11582,23 @@ show_param:
{
Lex->sql_command = SQLCOM_SHOW_MASTER_STAT;
}
+ | FULL SLAVE STATUS_SYM
+ {
+ Lex->sql_command = SQLCOM_SHOW_SLAVE_STAT;
+ Lex->verbose= 1;
+ }
| SLAVE STATUS_SYM
{
+ THD *thd= YYTHD;
+ LEX *lex= thd->lex;
+ lex->mi.connection_name= thd->variables.default_master_connection;
+ lex->sql_command = SQLCOM_SHOW_SLAVE_STAT;
+ lex->verbose= 0;
+ }
+ | SLAVE connection_name STATUS_SYM
+ {
Lex->sql_command = SQLCOM_SHOW_SLAVE_STAT;
+ Lex->verbose= 0;
}
| CLIENT_STATS_SYM
{
@@ -11824,7 +11862,7 @@ flush_option:
{ Lex->type|= REFRESH_LOG; }
| STATUS_SYM
{ Lex->type|= REFRESH_STATUS; }
- | SLAVE
+ | SLAVE optional_connection_name
{
Lex->type|= REFRESH_SLAVE;
Lex->reset_slave_info.all= false;
@@ -11871,6 +11909,7 @@ reset_options:
reset_option:
SLAVE { Lex->type|= REFRESH_SLAVE; }
+ optional_connection_name
slave_reset_options { }
| MASTER_SYM { Lex->type|= REFRESH_MASTER; }
| QUERY_SYM CACHE_SYM { Lex->type|= REFRESH_QUERY_CACHE;}
diff --git a/sql/strfunc.cc b/sql/strfunc.cc
index 0c0742b3805..9603ca30cfa 100644
--- a/sql/strfunc.cc
+++ b/sql/strfunc.cc
@@ -15,6 +15,7 @@
/* Some useful string utility functions used by the MySQL server */
+#include <my_global.h>
#include "sql_priv.h"
#include "unireg.h"
#include "strfunc.h"
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 4a003a89a7e..30cfe1a5680 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -796,6 +796,30 @@ static Sys_var_lexstring Sys_init_connect(
DEFAULT(""), &PLock_sys_init_connect, NOT_IN_BINLOG,
ON_CHECK(check_init_string));
+#ifdef HAVE_REPLICATION
+static bool check_master_connection(sys_var *self, THD *thd, set_var *var)
+{
+ LEX_STRING tmp;
+ tmp.str= var->save_result.string_value.str;
+ tmp.length= var->save_result.string_value.length;
+ if (check_master_connection_name(&tmp))
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(ME_JUST_WARNING),
+ "DEFAULT_MASTER_CONNECTION");
+ return true;
+ }
+ return false;
+}
+
+static Sys_var_session_lexstring Sys_default_master_connection(
+ "default_master_connection",
+ "Master connection to use for all slave variables and slave commands",
+ SESSION_ONLY(default_master_connection),
+ NO_CMD_LINE, IN_SYSTEM_CHARSET,
+ DEFAULT(""), MAX_CONNECTION_NAME, ON_CHECK(check_master_connection),
+ ON_UPDATE(0));
+#endif
+
static Sys_var_charptr Sys_init_file(
"init_file", "Read SQL commands from this file at startup",
READ_ONLY GLOBAL_VAR(opt_init_file),
@@ -3454,47 +3478,67 @@ static Sys_var_uint Sys_slave_net_timeout(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_slave_net_timeout));
-static bool check_slave_skip_counter(sys_var *self, THD *thd, set_var *var)
+
+/*
+ Access a multi_source variable
+ Return 0 + warning if it doesn't exist
+*/
+
+uint Sys_var_multi_source_uint::
+get_master_info_uint_value(THD *thd, ptrdiff_t offset)
{
- bool result= false;
+ Master_info *mi;
+ uint res= 0; // Default value
mysql_mutex_lock(&LOCK_active_mi);
- mysql_mutex_lock(&active_mi->rli.run_lock);
- if (active_mi->rli.slave_running)
+ mi= master_info_index->
+ get_master_info(&thd->variables.default_master_connection,
+ MYSQL_ERROR::WARN_LEVEL_WARN);
+ if (mi)
{
- my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
- result= true;
+ mysql_mutex_lock(&mi->rli.data_lock);
+ res= *((uint*) (((uchar*) mi) + master_info_offset));
+ mysql_mutex_unlock(&mi->rli.data_lock);
}
- mysql_mutex_unlock(&active_mi->rli.run_lock);
- mysql_mutex_unlock(&LOCK_active_mi);
- return result;
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return res;
}
-static bool fix_slave_skip_counter(sys_var *self, THD *thd, enum_var_type type)
+
+
+static bool update_slave_skip_counter(sys_var *self, THD *thd,
+ enum_var_type type)
{
- mysql_mutex_unlock(&LOCK_global_system_variables);
+ bool result= true;
+ Master_info *mi;
mysql_mutex_lock(&LOCK_active_mi);
- mysql_mutex_lock(&active_mi->rli.run_lock);
- /*
- The following test should normally never be true as we test this
- in the check function; To be safe against multiple
- SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
- */
- if (!active_mi->rli.slave_running)
+ mi= master_info_index->
+ get_master_info(&thd->variables.default_master_connection,
+ MYSQL_ERROR::WARN_LEVEL_ERROR);
+ if (mi)
{
- mysql_mutex_lock(&active_mi->rli.data_lock);
- active_mi->rli.slave_skip_counter= sql_slave_skip_counter;
- mysql_mutex_unlock(&active_mi->rli.data_lock);
+ mysql_mutex_lock(&mi->rli.run_lock);
+ if (mi->rli.slave_running)
+ my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
+ else
+ {
+ result= false; // ok
+ mysql_mutex_lock(&mi->rli.data_lock);
+ /* The value was stored temporarly in thd */
+ mi->rli.slave_skip_counter= thd->variables.slave_skip_counter;
+ mysql_mutex_unlock(&mi->rli.data_lock);
+ }
+ mysql_mutex_unlock(&mi->rli.run_lock);
}
- mysql_mutex_unlock(&active_mi->rli.run_lock);
mysql_mutex_unlock(&LOCK_active_mi);
- mysql_mutex_lock(&LOCK_global_system_variables);
- return 0;
+ return result;
}
-static Sys_var_uint Sys_slave_skip_counter(
- "sql_slave_skip_counter", "sql_slave_skip_counter",
- GLOBAL_VAR(sql_slave_skip_counter), NO_CMD_LINE,
- VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1),
- NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_slave_skip_counter),
- ON_UPDATE(fix_slave_skip_counter));
+
+static Sys_var_multi_source_uint
+Sys_slave_skip_counter("sql_slave_skip_counter",
+ "Skip the next N events from the master log",
+ SESSION_VAR(slave_skip_counter),
+ offsetof(Master_info, rli.slave_skip_counter),
+ VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1),
+ ON_UPDATE(update_slave_skip_counter));
static Sys_var_charptr Sys_slave_skip_errors(
"slave_skip_errors", "Tells the slave thread to continue "
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 21bebcd762c..6509ba6a5f4 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -633,6 +633,88 @@ public:
}
};
+
+/*
+ A LEX_STRING stored only in thd->variables
+ Only to be used for small buffers
+*/
+
+class Sys_var_session_lexstring: public sys_var
+{
+ size_t max_length;
+public:
+ Sys_var_session_lexstring(const char *name_arg,
+ const char *comment, int flag_args,
+ ptrdiff_t off, size_t size, CMD_LINE getopt,
+ enum charset_enum is_os_charset_arg,
+ const char *def_val, size_t max_length_arg,
+ on_check_function on_check_func=0,
+ on_update_function on_update_func=0)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id,
+ getopt.arg_type, SHOW_CHAR, (intptr)def_val,
+ 0, VARIABLE_NOT_IN_BINLOG, on_check_func, on_update_func,
+ 0, 0),max_length(max_length_arg)
+ {
+ option.var_type= GET_NO_ARG;
+ SYSVAR_ASSERT(scope() == ONLY_SESSION)
+ *const_cast<SHOW_TYPE*>(&show_val_type)= SHOW_LEX_STRING;
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ char buff[STRING_BUFFER_USUAL_SIZE];
+ String str(buff, sizeof(buff), system_charset_info), *res;
+
+ if (!(res=var->value->val_str(&str)))
+ var->save_result.string_value.str= const_cast<char*>("");
+ else
+ {
+ if (res->length() > max_length)
+ {
+ my_error(ER_WRONG_STRING_LENGTH, MYF(0),
+ res->ptr(), name.str, (int) max_length);
+ return true;
+ }
+ var->save_result.string_value.str= thd->strmake(res->ptr(),
+ res->length());
+ var->save_result.string_value.length= res->length();
+ }
+ return false;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ LEX_STRING *tmp= &session_var(thd, LEX_STRING);
+ tmp->length= var->save_result.string_value.length;
+ /* Store as \0 terminated string (just to be safe) */
+ strmake(tmp->str, var->save_result.string_value.str, tmp->length);
+ return false;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(FALSE);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ char *ptr= (char*)(intptr)option.def_value;
+ var->save_result.string_value.str= ptr;
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(FALSE);
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ return (uchar*) &session_var(thd, LEX_STRING);
+ }
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(FALSE);
+ }
+ bool check_update_type(Item_result type)
+ { return type != STRING_RESULT; }
+};
+
+
#ifndef DBUG_OFF
/**
@@session.dbug and @@global.dbug variables.
@@ -1389,6 +1471,7 @@ public:
};
#endif /* defined(ENABLED_DEBUG_SYNC) */
+
/**
The class for bit variables - a variant of boolean that stores the value
in a bit.
@@ -1862,6 +1945,65 @@ public:
bool global_update(THD *thd, set_var *var);
};
+/*
+ Class for handing multi-source replication variables
+ Variable values are store in Master_info, but to make it possible to
+ access variable without locks we also store it thd->variables.
+ These can be used as GLOBAL or SESSION, but both points to the same
+ variable. This is to make things compatible with MySQL 5.5 where variables
+ like sql_slave_skip_counter are GLOBAL.
+*/
+
+class Sys_var_multi_source_uint :public Sys_var_uint
+{
+ ptrdiff_t master_info_offset;
+public:
+ Sys_var_multi_source_uint(const char *name_arg,
+ const char *comment, int flag_args,
+ ptrdiff_t off, size_t size,
+ ptrdiff_t master_info_offset_arg,
+ uint min_val, uint max_val, uint def_val,
+ uint block_size,
+ on_update_function on_update_func)
+ :Sys_var_uint(name_arg, comment, flag_args, off, size,
+ NO_CMD_LINE, min_val, max_val, def_val, block_size,
+ 0, VARIABLE_NOT_IN_BINLOG, 0, on_update_func),
+ master_info_offset(master_info_offset_arg)
+ {
+ /* No global storage of variables. Cause a crash if we try an update */
+ option.value= (uchar**)1;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ session_var(thd, uint)= (uint) (var->save_result.ulonglong_value);
+ /* Value should be moved to multi_master in on_update_func */
+ return false;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ return session_update(thd, var);
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ /* Use value given in variable declaration */
+ global_save_default(thd, var);
+ }
+ uchar *session_value_ptr(THD *thd,LEX_STRING *base)
+ {
+ uint *tmp, res;
+ tmp= (uint*) (((uchar*)&(thd->variables)) + offset);
+ res= get_master_info_uint_value(thd, master_info_offset);
+ *tmp= res;
+ return (uchar*) tmp;
+ }
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ return session_value_ptr(thd, base);
+ }
+ uint get_master_info_uint_value(THD *thd, ptrdiff_t offset);
+};
+
+
/****************************************************************************
Used templates
****************************************************************************/