summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorMichael Widenius <monty@askmonty.org>2012-09-28 02:06:56 +0300
committerMichael Widenius <monty@askmonty.org>2012-09-28 02:06:56 +0300
commit1864d9596d9531427f1d63f86ada9e0cb21b51db (patch)
tree89ad647a849fea26b336faf565573a8d226ca7a9 /sql/slave.cc
parent620d14f8c3521f9ec7283b8690e0e16434739d33 (diff)
downloadmariadb-git-1864d9596d9531427f1d63f86ada9e0cb21b51db.tar.gz
Implementation of Multi-source replication (MDEV:253)
Documentation of the feature can be found at: http://kb.askmonty.org/en/multi-source-replication/ This code is based on code from Taobao, developed by Plinux BUILD/SETUP.sh: Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it) client/mysqltest.cc: Added support for error names starting with 'W' Added connection_name support to --sync_with_master cmake/maintainer.cmake: Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it) mysql-test/r/mysqltest.result: Updated results mysql-test/r/parser.result: Updated results mysql-test/suite/multi_source/my.cnf: Setup of multi-master tests mysql-test/suite/multi_source/simple.result: Simple basic test of multi-source functionality mysql-test/suite/multi_source/simple.test: Simple basic test of multi-source functionality mysql-test/suite/multi_source/syntax.result: Test of multi-source syntax mysql-test/suite/multi_source/syntax.test: Test of multi-source syntax mysql-test/suite/rpl/r/rpl_rotate_logs.result: Updated results because of new error messages mysql-test/t/parser.test: Updated test as master_pos_wait() now takes more arguments than before sql/event_scheduler.cc: No reason to initialize slave_thread (it's guaranteed to be zero here) sql/item_create.cc: Added connection_name argument to master_pos_wait() Simplified code sql/item_func.cc: Added connection_name argument to master_pos_wait() sql/item_func.h: Added connection_name argument to master_pos_wait() sql/log.cc: Added tag "Master 'connection_name'" to slave errors that has a connection name. sql/mysqld.cc: Added variable mysqld_server_initialized so that other functions can test if server is fully initialized. Free all slave data in one place (fewer ifdef's) Removed not needed call to close_active_mi() Initialize slaves() later in startup to ensure that everthing is really initialized when slaves start. Made status variable slave_running multi-source safe sql/mysqld.h: Added mysqld_server_initialized sql/rpl_mi.cc: Store connection name and cmp_connection_name (only used for show full slave status) in Master_info Added code for Master_info_index, which handles storage of multi-master information Don't write the empty "" connection_name to multi-master.info file. This is handled by the original code. sql/rpl_mi.h: Added connection_name and Master_info_index sql/rpl_rli.cc: Added connection_name to relay log files. sql/rpl_rli.h: Fixed type of slave_skip_counter as we now access it directly in sys_vars.cc, so it must be uint sql/share/errmsg-utf8.txt: Added new error messages needed for multi-source Added multi-source name to error ER_MASTER_INFO and WARN_NO_MASTER_INFO sql/slave.cc: Moved things a bit around to make it easier to handle error conditions. Create a global master_info_index and add the "" connection to it Ensure that new Master_info doesn't fail. Don't call terminate_slave_threads(active_mi..) on end_slave() as this is now done automaticly when deleting master_info_index. Delete not needed function close_active_mi(). One can achive same thing by calling end_slave(). Added support for SHOW FULL SLAVE STATUS (show status for all master connections with connection_name as first column) sql/slave.h: Added new prototypes sql/sql_base.cc: More DBUG_PRINT sql/sql_class.cc: Reset thd->connection_name and thd-->default_master_connection sql/sql_class.h: Added thd->connection_name and thd-->default_master_connection Added slave_skip_count to variables to make changing the @@sql_slave_skip_count variable thread safe sql/sql_const.h: Added MAX_CONNECTION_NAME sql/sql_lex.cc: Reset 'lex->verbose' (to simplify some sql_yacc.yy code) sql/sql_lex.h: Added connection_name sql/sql_parse.cc: Added support for connection_name to all SLAVE commands. - Instead of using active_mi, we now get the current Master_info from master_info_index. - Create new replication threads with CHANGE MASTER - Added support for show_all_master_info() sql/sql_reload.cc: Made reset/full slave use master_info_index->get_master_info() instead of active_mi. If one uses 'RESET SLAVE "connection_name" all' the connection is removed from master_info_index. sql/sql_repl.cc: sql_slave_skip_counter is moved to thd->variables to make it thread safe and fix some bugs with it Add connection name to relay log files. Added connection name to errors. Added some logging for multi-master if log_warnings > 1 stop_slave(): - Don't check if thd is set. It's guaranteed to always be set. change_master(): - Check for duplicate connection names in change_master() - Check for wrong arguments first in file (to simplify error handling) - Register new connections in master_info_index sql/sql_yacc.yy: Added optional connection_name to a all relevant master/slave commands sql/strfunc.cc: my_global.h shoud always be included first. sql/sys_vars.cc: Added variable default_master_connection Made variable sql_slave_skip_counter multi-source safe sql/sys_vars.h: Added Sys_var_session_lexstring (needed for default_master_connection) Added Sys_var_multi_source_uint (needed for sql_slave_skip_counter).
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc199
1 files changed, 145 insertions, 54 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 7b7bddecd17..8254e90b369 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -71,8 +71,10 @@ char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
char* slave_load_tmpdir = 0;
Master_info *active_mi= 0;
+Master_info_index *master_info_index;
my_bool replicate_same_server_id;
ulonglong relay_log_space_limit = 0;
+LEX_STRING default_master_connection_name= { (char*) "", 0 };
/*
When slave thread exits, we need to remember the temporary tables so we
@@ -144,7 +146,8 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(Relay_log_info* rli);
static inline bool io_slave_killed(THD* thd,Master_info* mi);
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
-static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
+static int init_slave_thread(THD* thd, Master_info *mi,
+ SLAVE_THD_TYPE thd_type);
static void print_slave_skip_errors(void);
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
@@ -159,6 +162,8 @@ static int terminate_slave_thread(THD *thd,
volatile uint *slave_running,
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
+static bool send_show_master_info_header(THD *thd, Master_info *mi, bool full);
+static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full);
/*
Find out which replications threads are running
@@ -263,15 +268,33 @@ int init_slave()
So it's safer to take the lock.
*/
mysql_mutex_lock(&LOCK_active_mi);
- /*
- TODO: re-write this to interate through the list of files
- for multi-master
- */
- active_mi= new Master_info(relay_log_recovery);
if (pthread_key_create(&RPL_MASTER_INFO, NULL))
goto err;
+ master_info_index= new Master_info_index;
+ if (!master_info_index || master_info_index->init_all_master_info())
+ {
+ sql_print_error("Failed to initialize multi master structures");
+ mysql_mutex_unlock(&LOCK_active_mi);
+ DBUG_RETURN(1);
+ }
+ if (!(active_mi= new Master_info(&default_master_connection_name,
+ relay_log_recovery)) ||
+ active_mi->error())
+ {
+ delete active_mi;
+ active_mi= 0;
+ goto err;
+ }
+
+ if (master_info_index->add_master_info(active_mi, FALSE))
+ {
+ delete active_mi;
+ active_mi= 0;
+ goto err;
+ }
+
/*
If --slave-skip-errors=... was not used, the string value for the
system variable has not been set up yet. Do it now.
@@ -286,18 +309,11 @@ int init_slave()
If master_host is specified, create the master_info file if it doesn't
exists.
*/
- if (!active_mi)
- {
- sql_print_error("Failed to allocate memory for the master info structure");
- error= 1;
- goto err;
- }
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
1, (SLAVE_IO | SLAVE_SQL)))
{
sql_print_error("Failed to initialize the master info structure");
- error= 1;
goto err;
}
@@ -313,14 +329,18 @@ int init_slave()
SLAVE_IO | SLAVE_SQL))
{
sql_print_error("Failed to create slave threads");
- error= 1;
goto err;
}
}
-err:
+end:
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(error);
+
+err:
+ sql_print_error("Failed to allocate memory for the Master Info structure");
+ error= 1;
+ goto end;
}
/*
@@ -820,42 +840,19 @@ void end_slave()
running presently. If a START SLAVE was in progress, the mutex lock below
will make us wait until slave threads have started, and START SLAVE
returns, then we terminate them here.
+
+ We can also be called by cleanup(), which only happens if some
+ startup parameter to the server was wrong.
*/
mysql_mutex_lock(&LOCK_active_mi);
- if (active_mi)
- {
- /*
- TODO: replace the line below with
- list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
- once multi-master code is ready.
- */
- terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
- }
+ /* This will call terminate_slave_threads() on all connections */
+ delete master_info_index;
+ master_info_index= 0;
+ active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_VOID_RETURN;
}
-/**
- Free all resources used by slave threads at time of executing shutdown.
- The routine must be called after all possible users of @c active_mi
- have left.
-
- SYNOPSIS
- close_active_mi()
-
-*/
-void close_active_mi()
-{
- mysql_mutex_lock(&LOCK_active_mi);
- if (active_mi)
- {
- end_master_info(active_mi);
- delete active_mi;
- active_mi= 0;
- }
- mysql_mutex_unlock(&LOCK_active_mi);
-}
-
static bool io_slave_killed(THD* thd, Master_info* mi)
{
DBUG_ENTER("io_slave_killed");
@@ -2022,12 +2019,28 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
@retval FALSE success
@retval TRUE failure
*/
-bool show_master_info(THD* thd, Master_info* mi)
+
+bool show_master_info(THD *thd, Master_info *mi, bool full)
+{
+ DBUG_ENTER("show_master_info");
+
+ if (send_show_master_info_header(thd, mi, full))
+ DBUG_RETURN(TRUE);
+ if (send_show_master_info_data(thd, mi, full))
+ DBUG_RETURN(TRUE);
+ my_eof(thd);
+ DBUG_RETURN(FALSE);
+}
+
+static bool send_show_master_info_header(THD *thd, Master_info *mi, bool full)
{
- // TODO: fix this for multi-master
List<Item> field_list;
Protocol *protocol= thd->protocol;
- DBUG_ENTER("show_master_info");
+ DBUG_ENTER("show_master_info_header");
+
+ if (full)
+ field_list.push_back(new Item_empty_string("Connection_name",
+ MAX_CONNECTION_NAME));
field_list.push_back(new Item_empty_string("Slave_IO_State",
14));
@@ -2097,17 +2110,29 @@ bool show_master_info(THD* thd, Master_info* mi)
if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
+ DBUG_RETURN(FALSE);
+}
+
+
+static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full)
+{
+ DBUG_ENTER("send_show_master_info_data");
if (mi->host[0])
{
DBUG_PRINT("info",("host is set: '%s'", mi->host));
String *packet= &thd->packet;
+ Protocol *protocol= thd->protocol;
+
protocol->prepare_for_resend();
/*
slave_running can be accessed without run_lock but not other
non-volotile members like mi->io_thd, which is guarded by the mutex.
*/
+ if (full)
+ protocol->store(mi->connection_name.str, mi->connection_name.length,
+ &my_charset_bin);
mysql_mutex_lock(&mi->run_lock);
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
mysql_mutex_unlock(&mi->run_lock);
@@ -2250,6 +2275,65 @@ bool show_master_info(THD* thd, Master_info* mi)
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
DBUG_RETURN(TRUE);
}
+ DBUG_RETURN(FALSE);
+}
+
+
+/* Used to sort connections by name */
+
+static int cmp_mi_by_name(const Master_info **arg1,
+ const Master_info **arg2)
+{
+ return my_strcasecmp(system_charset_info, (*arg1)->connection_name.str,
+ (*arg2)->connection_name.str);
+}
+
+
+/**
+ Execute a SHOW FULL SLAVE STATUS statement.
+
+ @param thd Pointer to THD object for the client thread executing the
+ statement.
+
+ Elements are sorted according to the original connection_name.
+
+ @retval FALSE success
+ @retval TRUE failure
+*/
+
+bool show_all_master_info(THD* thd)
+{
+ uint i, elements;
+ Master_info **tmp;
+ DBUG_ENTER("show_master_info");
+
+ if (send_show_master_info_header(thd, active_mi, 1))
+ DBUG_RETURN(TRUE);
+
+ if (!(elements= master_info_index->master_info_hash.records))
+ goto end;
+
+ /*
+ Sort lines to get them into a predicted order
+ (needed for test cases and to not confuse users)
+ */
+ if (!(tmp= (Master_info**) thd->alloc(sizeof(Master_info*) * elements)))
+ DBUG_RETURN(TRUE);
+
+ for (i= 0; i < elements; i++)
+ {
+ tmp[i]= (Master_info *) my_hash_element(&master_info_index->
+ master_info_hash, i);
+ }
+ my_qsort(tmp, elements, sizeof(Master_info*), (qsort_cmp) cmp_mi_by_name);
+
+ for (i= 0; i < elements; i++)
+ {
+ if (send_show_master_info_data(thd, tmp[i], 1))
+ DBUG_RETURN(TRUE);
+ }
+
+end:
my_eof(thd);
DBUG_RETURN(FALSE);
}
@@ -2303,7 +2387,8 @@ void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
init_slave_thread()
*/
-static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
+static int init_slave_thread(THD* thd, Master_info *mi,
+ SLAVE_THD_TYPE thd_type)
{
DBUG_ENTER("init_slave_thread");
#if !defined(DBUG_OFF)
@@ -2319,7 +2404,8 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
than the corresponding packet (query) sent from client to master.
*/
thd->variables.max_allowed_packet= slave_max_allowed_packet;
- thd->slave_thread = 1;
+ thd->slave_thread= 1;
+ thd->connection_name= mi->connection_name;
thd->enable_slow_log= opt_log_slow_slave_statements;
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
set_slave_thread_options(thd);
@@ -2635,7 +2721,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
int reason= ev->shall_skip(rli);
if (reason == Log_event::EVENT_SKIP_COUNT)
- sql_slave_skip_counter= --rli->slave_skip_counter;
+ {
+ DBUG_ASSERT(rli->slave_skip_counter > 0);
+ rli->slave_skip_counter--;
+ }
mysql_mutex_unlock(&rli->data_lock);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
@@ -3058,7 +3147,7 @@ pthread_handler_t handle_slave_io(void *arg)
pthread_detach_this_thread();
thd->thread_stack= (char*) &thd; // remember where our stack is
mi->clear_error();
- if (init_slave_thread(thd, SLAVE_THD_IO))
+ if (init_slave_thread(thd, mi, SLAVE_THD_IO))
{
mysql_cond_broadcast(&mi->start_cond);
sql_print_error("Failed during slave I/O thread initialization");
@@ -3457,7 +3546,8 @@ pthread_handler_t handle_slave_sql(void *arg)
my_off_t UNINIT_VAR(saved_log_pos);
my_off_t UNINIT_VAR(saved_master_log_pos);
my_off_t saved_skip= 0;
- Relay_log_info* rli = &((Master_info*)arg)->rli;
+ Master_info *mi= ((Master_info*)arg);
+ Relay_log_info* rli = &mi->rli;
const char *errmsg;
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
@@ -3471,6 +3561,7 @@ pthread_handler_t handle_slave_sql(void *arg)
thd->thread_stack = (char*)&thd; // remember where our stack is
DBUG_ASSERT(rli->inited);
+ DBUG_ASSERT(rli->mi == mi);
mysql_mutex_lock(&rli->run_lock);
DBUG_ASSERT(!rli->slave_running);
errmsg= 0;
@@ -3485,7 +3576,7 @@ pthread_handler_t handle_slave_sql(void *arg)
rli->slave_running = 1;
pthread_detach_this_thread();
- if (init_slave_thread(thd, SLAVE_THD_SQL))
+ if (init_slave_thread(thd, mi, SLAVE_THD_SQL))
{
/*
TODO: this is currently broken - slave start and change master