diff options
| author | Michael Widenius <monty@askmonty.org> | 2012-09-28 02:06:56 +0300 |
|---|---|---|
| committer | Michael Widenius <monty@askmonty.org> | 2012-09-28 02:06:56 +0300 |
| commit | 1864d9596d9531427f1d63f86ada9e0cb21b51db (patch) | |
| tree | 89ad647a849fea26b336faf565573a8d226ca7a9 /sql/slave.cc | |
| parent | 620d14f8c3521f9ec7283b8690e0e16434739d33 (diff) | |
| download | mariadb-git-1864d9596d9531427f1d63f86ada9e0cb21b51db.tar.gz | |
Implementation of Multi-source replication (MDEV:253)
Documentation of the feature can be found at: http://kb.askmonty.org/en/multi-source-replication/
This code is based on code from Taobao, developed by Plinux
BUILD/SETUP.sh:
Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it)
client/mysqltest.cc:
Added support for error names starting with 'W'
Added connection_name support to --sync_with_master
cmake/maintainer.cmake:
Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it)
mysql-test/r/mysqltest.result:
Updated results
mysql-test/r/parser.result:
Updated results
mysql-test/suite/multi_source/my.cnf:
Setup of multi-master tests
mysql-test/suite/multi_source/simple.result:
Simple basic test of multi-source functionality
mysql-test/suite/multi_source/simple.test:
Simple basic test of multi-source functionality
mysql-test/suite/multi_source/syntax.result:
Test of multi-source syntax
mysql-test/suite/multi_source/syntax.test:
Test of multi-source syntax
mysql-test/suite/rpl/r/rpl_rotate_logs.result:
Updated results because of new error messages
mysql-test/t/parser.test:
Updated test as master_pos_wait() now takes more arguments than before
sql/event_scheduler.cc:
No reason to initialize slave_thread (it's guaranteed to be zero here)
sql/item_create.cc:
Added connection_name argument to master_pos_wait()
Simplified code
sql/item_func.cc:
Added connection_name argument to master_pos_wait()
sql/item_func.h:
Added connection_name argument to master_pos_wait()
sql/log.cc:
Added tag "Master 'connection_name'" to slave errors that has a connection name.
sql/mysqld.cc:
Added variable mysqld_server_initialized so that other functions can test if server is fully initialized.
Free all slave data in one place (fewer ifdef's)
Removed not needed call to close_active_mi()
Initialize slaves() later in startup to ensure that everthing is really initialized when slaves start.
Made status variable slave_running multi-source safe
sql/mysqld.h:
Added mysqld_server_initialized
sql/rpl_mi.cc:
Store connection name and cmp_connection_name (only used for show full slave status) in Master_info
Added code for Master_info_index, which handles storage of multi-master information
Don't write the empty "" connection_name to multi-master.info file. This is handled by the original code.
sql/rpl_mi.h:
Added connection_name and Master_info_index
sql/rpl_rli.cc:
Added connection_name to relay log files.
sql/rpl_rli.h:
Fixed type of slave_skip_counter as we now access it directly in sys_vars.cc, so it must be uint
sql/share/errmsg-utf8.txt:
Added new error messages needed for multi-source
Added multi-source name to error ER_MASTER_INFO and WARN_NO_MASTER_INFO
sql/slave.cc:
Moved things a bit around to make it easier to handle error conditions.
Create a global master_info_index and add the "" connection to it
Ensure that new Master_info doesn't fail.
Don't call terminate_slave_threads(active_mi..) on end_slave() as this is now done automaticly when deleting master_info_index.
Delete not needed function close_active_mi(). One can achive same thing by calling end_slave().
Added support for SHOW FULL SLAVE STATUS (show status for all master connections with connection_name as first column)
sql/slave.h:
Added new prototypes
sql/sql_base.cc:
More DBUG_PRINT
sql/sql_class.cc:
Reset thd->connection_name and thd-->default_master_connection
sql/sql_class.h:
Added thd->connection_name and thd-->default_master_connection
Added slave_skip_count to variables to make changing the @@sql_slave_skip_count variable thread safe
sql/sql_const.h:
Added MAX_CONNECTION_NAME
sql/sql_lex.cc:
Reset 'lex->verbose' (to simplify some sql_yacc.yy code)
sql/sql_lex.h:
Added connection_name
sql/sql_parse.cc:
Added support for connection_name to all SLAVE commands.
- Instead of using active_mi, we now get the current Master_info from master_info_index.
- Create new replication threads with CHANGE MASTER
- Added support for show_all_master_info()
sql/sql_reload.cc:
Made reset/full slave use master_info_index->get_master_info() instead of active_mi.
If one uses 'RESET SLAVE "connection_name" all' the connection is removed from master_info_index.
sql/sql_repl.cc:
sql_slave_skip_counter is moved to thd->variables to make it thread safe and fix some bugs with it
Add connection name to relay log files.
Added connection name to errors.
Added some logging for multi-master if log_warnings > 1
stop_slave():
- Don't check if thd is set. It's guaranteed to always be set.
change_master():
- Check for duplicate connection names in change_master()
- Check for wrong arguments first in file (to simplify error handling)
- Register new connections in master_info_index
sql/sql_yacc.yy:
Added optional connection_name to a all relevant master/slave commands
sql/strfunc.cc:
my_global.h shoud always be included first.
sql/sys_vars.cc:
Added variable default_master_connection
Made variable sql_slave_skip_counter multi-source safe
sql/sys_vars.h:
Added Sys_var_session_lexstring (needed for default_master_connection)
Added Sys_var_multi_source_uint (needed for sql_slave_skip_counter).
Diffstat (limited to 'sql/slave.cc')
| -rw-r--r-- | sql/slave.cc | 199 |
1 files changed, 145 insertions, 54 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 7b7bddecd17..8254e90b369 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -71,8 +71,10 @@ char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; char* slave_load_tmpdir = 0; Master_info *active_mi= 0; +Master_info_index *master_info_index; my_bool replicate_same_server_id; ulonglong relay_log_space_limit = 0; +LEX_STRING default_master_connection_name= { (char*) "", 0 }; /* When slave thread exits, we need to remember the temporary tables so we @@ -144,7 +146,8 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev); static bool wait_for_relay_log_space(Relay_log_info* rli); static inline bool io_slave_killed(THD* thd,Master_info* mi); static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli); -static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); +static int init_slave_thread(THD* thd, Master_info *mi, + SLAVE_THD_TYPE thd_type); static void print_slave_skip_errors(void); static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, @@ -159,6 +162,8 @@ static int terminate_slave_thread(THD *thd, volatile uint *slave_running, bool skip_lock); static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info); +static bool send_show_master_info_header(THD *thd, Master_info *mi, bool full); +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full); /* Find out which replications threads are running @@ -263,15 +268,33 @@ int init_slave() So it's safer to take the lock. */ mysql_mutex_lock(&LOCK_active_mi); - /* - TODO: re-write this to interate through the list of files - for multi-master - */ - active_mi= new Master_info(relay_log_recovery); if (pthread_key_create(&RPL_MASTER_INFO, NULL)) goto err; + master_info_index= new Master_info_index; + if (!master_info_index || master_info_index->init_all_master_info()) + { + sql_print_error("Failed to initialize multi master structures"); + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(1); + } + if (!(active_mi= new Master_info(&default_master_connection_name, + relay_log_recovery)) || + active_mi->error()) + { + delete active_mi; + active_mi= 0; + goto err; + } + + if (master_info_index->add_master_info(active_mi, FALSE)) + { + delete active_mi; + active_mi= 0; + goto err; + } + /* If --slave-skip-errors=... was not used, the string value for the system variable has not been set up yet. Do it now. @@ -286,18 +309,11 @@ int init_slave() If master_host is specified, create the master_info file if it doesn't exists. */ - if (!active_mi) - { - sql_print_error("Failed to allocate memory for the master info structure"); - error= 1; - goto err; - } if (init_master_info(active_mi,master_info_file,relay_log_info_file, 1, (SLAVE_IO | SLAVE_SQL))) { sql_print_error("Failed to initialize the master info structure"); - error= 1; goto err; } @@ -313,14 +329,18 @@ int init_slave() SLAVE_IO | SLAVE_SQL)) { sql_print_error("Failed to create slave threads"); - error= 1; goto err; } } -err: +end: mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(error); + +err: + sql_print_error("Failed to allocate memory for the Master Info structure"); + error= 1; + goto end; } /* @@ -820,42 +840,19 @@ void end_slave() running presently. If a START SLAVE was in progress, the mutex lock below will make us wait until slave threads have started, and START SLAVE returns, then we terminate them here. + + We can also be called by cleanup(), which only happens if some + startup parameter to the server was wrong. */ mysql_mutex_lock(&LOCK_active_mi); - if (active_mi) - { - /* - TODO: replace the line below with - list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); - once multi-master code is ready. - */ - terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); - } + /* This will call terminate_slave_threads() on all connections */ + delete master_info_index; + master_info_index= 0; + active_mi= 0; mysql_mutex_unlock(&LOCK_active_mi); DBUG_VOID_RETURN; } -/** - Free all resources used by slave threads at time of executing shutdown. - The routine must be called after all possible users of @c active_mi - have left. - - SYNOPSIS - close_active_mi() - -*/ -void close_active_mi() -{ - mysql_mutex_lock(&LOCK_active_mi); - if (active_mi) - { - end_master_info(active_mi); - delete active_mi; - active_mi= 0; - } - mysql_mutex_unlock(&LOCK_active_mi); -} - static bool io_slave_killed(THD* thd, Master_info* mi) { DBUG_ENTER("io_slave_killed"); @@ -2022,12 +2019,28 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, @retval FALSE success @retval TRUE failure */ -bool show_master_info(THD* thd, Master_info* mi) + +bool show_master_info(THD *thd, Master_info *mi, bool full) +{ + DBUG_ENTER("show_master_info"); + + if (send_show_master_info_header(thd, mi, full)) + DBUG_RETURN(TRUE); + if (send_show_master_info_data(thd, mi, full)) + DBUG_RETURN(TRUE); + my_eof(thd); + DBUG_RETURN(FALSE); +} + +static bool send_show_master_info_header(THD *thd, Master_info *mi, bool full) { - // TODO: fix this for multi-master List<Item> field_list; Protocol *protocol= thd->protocol; - DBUG_ENTER("show_master_info"); + DBUG_ENTER("show_master_info_header"); + + if (full) + field_list.push_back(new Item_empty_string("Connection_name", + MAX_CONNECTION_NAME)); field_list.push_back(new Item_empty_string("Slave_IO_State", 14)); @@ -2097,17 +2110,29 @@ bool show_master_info(THD* thd, Master_info* mi) if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); + DBUG_RETURN(FALSE); +} + + +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full) +{ + DBUG_ENTER("send_show_master_info_data"); if (mi->host[0]) { DBUG_PRINT("info",("host is set: '%s'", mi->host)); String *packet= &thd->packet; + Protocol *protocol= thd->protocol; + protocol->prepare_for_resend(); /* slave_running can be accessed without run_lock but not other non-volotile members like mi->io_thd, which is guarded by the mutex. */ + if (full) + protocol->store(mi->connection_name.str, mi->connection_name.length, + &my_charset_bin); mysql_mutex_lock(&mi->run_lock); protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin); mysql_mutex_unlock(&mi->run_lock); @@ -2250,6 +2275,65 @@ bool show_master_info(THD* thd, Master_info* mi) if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length())) DBUG_RETURN(TRUE); } + DBUG_RETURN(FALSE); +} + + +/* Used to sort connections by name */ + +static int cmp_mi_by_name(const Master_info **arg1, + const Master_info **arg2) +{ + return my_strcasecmp(system_charset_info, (*arg1)->connection_name.str, + (*arg2)->connection_name.str); +} + + +/** + Execute a SHOW FULL SLAVE STATUS statement. + + @param thd Pointer to THD object for the client thread executing the + statement. + + Elements are sorted according to the original connection_name. + + @retval FALSE success + @retval TRUE failure +*/ + +bool show_all_master_info(THD* thd) +{ + uint i, elements; + Master_info **tmp; + DBUG_ENTER("show_master_info"); + + if (send_show_master_info_header(thd, active_mi, 1)) + DBUG_RETURN(TRUE); + + if (!(elements= master_info_index->master_info_hash.records)) + goto end; + + /* + Sort lines to get them into a predicted order + (needed for test cases and to not confuse users) + */ + if (!(tmp= (Master_info**) thd->alloc(sizeof(Master_info*) * elements))) + DBUG_RETURN(TRUE); + + for (i= 0; i < elements; i++) + { + tmp[i]= (Master_info *) my_hash_element(&master_info_index-> + master_info_hash, i); + } + my_qsort(tmp, elements, sizeof(Master_info*), (qsort_cmp) cmp_mi_by_name); + + for (i= 0; i < elements; i++) + { + if (send_show_master_info_data(thd, tmp[i], 1)) + DBUG_RETURN(TRUE); + } + +end: my_eof(thd); DBUG_RETURN(FALSE); } @@ -2303,7 +2387,8 @@ void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli) init_slave_thread() */ -static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) +static int init_slave_thread(THD* thd, Master_info *mi, + SLAVE_THD_TYPE thd_type) { DBUG_ENTER("init_slave_thread"); #if !defined(DBUG_OFF) @@ -2319,7 +2404,8 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) than the corresponding packet (query) sent from client to master. */ thd->variables.max_allowed_packet= slave_max_allowed_packet; - thd->slave_thread = 1; + thd->slave_thread= 1; + thd->connection_name= mi->connection_name; thd->enable_slow_log= opt_log_slow_slave_statements; thd->variables.log_slow_filter= global_system_variables.log_slow_filter; set_slave_thread_options(thd); @@ -2635,7 +2721,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) int reason= ev->shall_skip(rli); if (reason == Log_event::EVENT_SKIP_COUNT) - sql_slave_skip_counter= --rli->slave_skip_counter; + { + DBUG_ASSERT(rli->slave_skip_counter > 0); + rli->slave_skip_counter--; + } mysql_mutex_unlock(&rli->data_lock); if (reason == Log_event::EVENT_SKIP_NOT) exec_res= ev->apply_event(rli); @@ -3058,7 +3147,7 @@ pthread_handler_t handle_slave_io(void *arg) pthread_detach_this_thread(); thd->thread_stack= (char*) &thd; // remember where our stack is mi->clear_error(); - if (init_slave_thread(thd, SLAVE_THD_IO)) + if (init_slave_thread(thd, mi, SLAVE_THD_IO)) { mysql_cond_broadcast(&mi->start_cond); sql_print_error("Failed during slave I/O thread initialization"); @@ -3457,7 +3546,8 @@ pthread_handler_t handle_slave_sql(void *arg) my_off_t UNINIT_VAR(saved_log_pos); my_off_t UNINIT_VAR(saved_master_log_pos); my_off_t saved_skip= 0; - Relay_log_info* rli = &((Master_info*)arg)->rli; + Master_info *mi= ((Master_info*)arg); + Relay_log_info* rli = &mi->rli; const char *errmsg; // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff @@ -3471,6 +3561,7 @@ pthread_handler_t handle_slave_sql(void *arg) thd->thread_stack = (char*)&thd; // remember where our stack is DBUG_ASSERT(rli->inited); + DBUG_ASSERT(rli->mi == mi); mysql_mutex_lock(&rli->run_lock); DBUG_ASSERT(!rli->slave_running); errmsg= 0; @@ -3485,7 +3576,7 @@ pthread_handler_t handle_slave_sql(void *arg) rli->slave_running = 1; pthread_detach_this_thread(); - if (init_slave_thread(thd, SLAVE_THD_SQL)) + if (init_slave_thread(thd, mi, SLAVE_THD_SQL)) { /* TODO: this is currently broken - slave start and change master |
