diff options
author | Seppo Jaakola <seppo.jaakola@codership.com> | 2013-12-04 10:32:43 +0200 |
---|---|---|
committer | Seppo Jaakola <seppo.jaakola@codership.com> | 2013-12-04 10:32:43 +0200 |
commit | 496e22cf3bd2a481fd3502d86e5a4e8228bf9823 (patch) | |
tree | 80549f8005fcf3236bfa004a5aea35e4e67b36ca /sql/slave.cc | |
parent | 45f484b8381a5923aec9c704e54c7f7bcfa02a40 (diff) | |
parent | 26f56089c734852dc31d98fd73e1d8f1750bd1a8 (diff) | |
download | mariadb-git-496e22cf3bd2a481fd3502d86e5a4e8228bf9823.tar.gz |
merge with MariaDB 5.6 bzr merge lp:maria --rtag:mariadb-10.0.6
and a number of fixes to make this buildable.
Run also few short multi-master high conflict rate tests, with no issues
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 929 |
1 files changed, 583 insertions, 346 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 8665cf646fc..f3ac16db110 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -60,6 +60,8 @@ #include "rpl_tblmap.h" #include "debug_sync.h" +#include "rpl_parallel.h" + #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -147,8 +149,8 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; static int process_io_rotate(Master_info* mi, Rotate_log_event* rev); static int process_io_create_file(Master_info* mi, Create_file_log_event* cev); static bool wait_for_relay_log_space(Relay_log_info* rli); -static inline bool io_slave_killed(THD* thd,Master_info* mi); -static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli); +static bool io_slave_killed(Master_info* mi); +static bool sql_slave_killed(rpl_group_info *rgi); static int init_slave_thread(THD* thd, Master_info *mi, SLAVE_THD_TYPE thd_type); static void print_slave_skip_errors(void); @@ -157,14 +159,14 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, bool suppress_warnings); static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, bool reconnect, bool suppress_warnings); -static Log_event* next_event(Relay_log_info* rli); +static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size); static int queue_event(Master_info* mi,const char* buf,ulong event_len); static int terminate_slave_thread(THD *thd, mysql_mutex_t *term_lock, mysql_cond_t *term_cond, volatile uint *slave_running, bool skip_lock); -static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info); +static bool check_io_slave_killed(Master_info *mi, const char *info); static bool send_show_master_info_header(THD *thd, bool full, size_t gtid_pos_length); static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, @@ -395,6 +397,9 @@ int init_slave() goto err; } + if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) + return 1; + /* If --slave-skip-errors=... was not used, the string value for the system variable has not been set up yet. Do it now. @@ -600,26 +605,6 @@ void init_slave_skip_errors(const char* arg) DBUG_VOID_RETURN; } -static void set_thd_in_use_temporary_tables(Relay_log_info *rli) -{ - TABLE *table; - - for (table= rli->save_temporary_tables ; table ; table= table->next) - { - table->in_use= rli->sql_thd; - if (table->file != NULL) - { - /* - Since we are stealing opened temporary tables from one thread to another, - we need to let the performance schema know that, - for aggregates per thread to work properly. - */ - table->file->unbind_psi(); - table->file->rebind_psi(); - } - } -} - int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) { DBUG_ENTER("terminate_slave_threads"); @@ -634,7 +619,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) { DBUG_PRINT("info",("Terminating SQL thread")); mi->rli.abort_slave=1; - if ((error=terminate_slave_thread(mi->rli.sql_thd, sql_lock, + if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock, &mi->rli.stop_cond, &mi->rli.slave_running, skip_lock)) && @@ -997,17 +982,17 @@ void end_slave() master_info_index= 0; active_mi= 0; mysql_mutex_unlock(&LOCK_active_mi); + global_rpl_thread_pool.destroy(); free_all_rpl_filters(); DBUG_VOID_RETURN; } -static bool io_slave_killed(THD* thd, Master_info* mi) +static bool io_slave_killed(Master_info* mi) { DBUG_ENTER("io_slave_killed"); - DBUG_ASSERT(mi->io_thd == thd); DBUG_ASSERT(mi->slave_running); // tracking buffer overrun - DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed); + DBUG_RETURN(mi->abort_slave || abort_loop || mi->io_thd->killed); } /** @@ -1023,26 +1008,36 @@ static bool io_slave_killed(THD* thd, Master_info* mi) @return TRUE the killed status is recognized, FALSE a possible killed status is deferred. */ -static bool sql_slave_killed(THD* thd, Relay_log_info* rli) +static bool sql_slave_killed(rpl_group_info *rgi) { bool ret= FALSE; + Relay_log_info *rli= rgi->rli; + THD *thd= rgi->thd; DBUG_ENTER("sql_slave_killed"); - DBUG_ASSERT(rli->sql_thd == thd); + DBUG_ASSERT(rli->sql_driver_thd == thd); DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun - if (abort_loop || thd->killed || rli->abort_slave) + if (abort_loop || rli->sql_driver_thd->killed || rli->abort_slave) { /* - The transaction should always be binlogged if OPTION_KEEP_LOG is set - (it implies that something can not be rolled back). And such case - should be regarded similarly as modifing a non-transactional table - because retrying of the transaction will lead to an error or inconsistency - as well. - Example: OPTION_KEEP_LOG is set if a temporary table is created or dropped. + The transaction should always be binlogged if OPTION_KEEP_LOG is + set (it implies that something can not be rolled back). And such + case should be regarded similarly as modifing a + non-transactional table because retrying of the transaction will + lead to an error or inconsistency as well. + + Example: OPTION_KEEP_LOG is set if a temporary table is created + or dropped. + + Note that transaction.all.modified_non_trans_table may be 1 + if last statement was a single row transaction without begin/end. + Testing this flag must always be done in connection with + rli->is_in_group(). */ + if ((thd->transaction.all.modified_non_trans_table || - (thd->variables.option_bits & OPTION_KEEP_LOG)) - && rli->is_in_group()) + (thd->variables.option_bits & OPTION_KEEP_LOG)) && + rli->is_in_group()) { char msg_stopped[]= "... Slave SQL Thread stopped with incomplete event group " @@ -1052,25 +1047,33 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) "ignores duplicate key, key not found, and similar errors (see " "documentation for details)."; + DBUG_PRINT("info", ("modified_non_trans_table: %d OPTION_BEGIN: %d " + "is_in_group: %d", + thd->transaction.all.modified_non_trans_table, + test(thd->variables.option_bits & OPTION_BEGIN), + rli->is_in_group())); + if (rli->abort_slave) { - DBUG_PRINT("info", ("Request to stop slave SQL Thread received while " - "applying a group that has non-transactional " - "changes; waiting for completion of the group ... ")); + DBUG_PRINT("info", + ("Request to stop slave SQL Thread received while " + "applying a group that has non-transactional " + "changes; waiting for completion of the group ... ")); /* - Slave sql thread shutdown in face of unfinished group modified - Non-trans table is handled via a timer. The slave may eventually - give out to complete the current group and in that case there - might be issues at consequent slave restart, see the error message. - WL#2975 offers a robust solution requiring to store the last exectuted - event's coordinates along with the group's coordianates - instead of waiting with @c last_event_start_time the timer. + Slave sql thread shutdown in face of unfinished group + modified Non-trans table is handled via a timer. The slave + may eventually give out to complete the current group and in + that case there might be issues at consequent slave restart, + see the error message. WL#2975 offers a robust solution + requiring to store the last exectuted event's coordinates + along with the group's coordianates instead of waiting with + @c last_event_start_time the timer. */ - if (rli->last_event_start_time == 0) - rli->last_event_start_time= my_time(0); - ret= difftime(my_time(0), rli->last_event_start_time) <= + if (rgi->last_event_start_time == 0) + rgi->last_event_start_time= my_time(0); + ret= difftime(my_time(0), rgi->last_event_start_time) <= SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE; DBUG_EXECUTE_IF("stop_slave_middle_group", @@ -1093,7 +1096,8 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) else { ret= TRUE; - rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR), + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), msg_stopped); } } @@ -1103,7 +1107,7 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) } } if (ret) - rli->last_event_start_time= 0; + rgi->last_event_start_time= 0; DBUG_RETURN(ret); } @@ -1505,7 +1509,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) mi->clock_diff_with_master= (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10)); } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { @@ -1570,7 +1574,7 @@ not always make sense; please check the manual before using it)."; } else if (mysql_errno(mysql)) { - if (check_io_slave_killed(mi->io_thd, mi, NULL)) + if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { @@ -1643,7 +1647,7 @@ be equal for the Statement-format replication to work"; goto err; } } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { @@ -1706,7 +1710,7 @@ be equal for the Statement-format replication to work"; goto err; } } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(err_code= mysql_errno(mysql))) { @@ -1751,7 +1755,7 @@ when it try to get the value of TIME_ZONE global variable from master."; sprintf(query, query_format, llbuf); if (mysql_real_query(mysql, query, strlen(query)) - && !check_io_slave_killed(mi->io_thd, mi, NULL)) + && !check_io_slave_killed(mi, NULL)) { errmsg= "The slave I/O thread stops because SET @master_heartbeat_period " "on master failed."; @@ -1786,7 +1790,7 @@ when it try to get the value of TIME_ZONE global variable from master."; rc= mysql_real_query(mysql, query, strlen(query)); if (rc != 0) { - if (check_io_slave_killed(mi->io_thd, mi, NULL)) + if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; if (mysql_errno(mysql) == ER_UNKNOWN_SYSTEM_VARIABLE) @@ -1832,7 +1836,7 @@ when it try to get the value of TIME_ZONE global variable from master."; DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF || mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32); } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { @@ -2096,7 +2100,7 @@ after_set_capability: rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0], strlen(master_row[0]), false, false); } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { @@ -2162,7 +2166,7 @@ static bool wait_for_relay_log_space(Relay_log_info* rli) &stage_waiting_for_relay_log_space, &old_stage); while (rli->log_space_limit < rli->log_space_total && - !(slave_killed=io_slave_killed(thd,mi)) && + !(slave_killed=io_slave_killed(mi)) && !rli->ignore_log_space_limit) mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock); @@ -2241,34 +2245,66 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) DBUG_ASSERT(thd == mi->io_thd); mysql_mutex_lock(log_lock); - if (rli->ign_master_log_name_end[0]) - { - DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); - Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end, - 0, rli->ign_master_log_pos_end, - Rotate_log_event::DUP_NAME); - rli->ign_master_log_name_end[0]= 0; - /* can unlock before writing as slave SQL thd will soon see our Rotate */ + if (rli->ign_master_log_name_end[0] || rli->ign_gtids.count()) + { + Rotate_log_event *rev= NULL; + Gtid_list_log_event *glev= NULL; + if (rli->ign_master_log_name_end[0]) + { + rev= new Rotate_log_event(rli->ign_master_log_name_end, + 0, rli->ign_master_log_pos_end, + Rotate_log_event::DUP_NAME); + rli->ign_master_log_name_end[0]= 0; + if (unlikely(!(bool)rev)) + mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, + ER(ER_SLAVE_CREATE_EVENT_FAILURE), + "Rotate_event (out of memory?)," + " SHOW SLAVE STATUS may be inaccurate"); + } + if (rli->ign_gtids.count()) + { + glev= new Gtid_list_log_event(&rli->ign_gtids, + Gtid_list_log_event::FLAG_IGN_GTIDS); + rli->ign_gtids.reset(); + if (unlikely(!(bool)glev)) + mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, + ER(ER_SLAVE_CREATE_EVENT_FAILURE), + "Gtid_list_event (out of memory?)," + " gtid_slave_pos may be inaccurate"); + } + + /* Can unlock before writing as slave SQL thd will soon see our event. */ mysql_mutex_unlock(log_lock); - if (likely((bool)ev)) + if (rev) { - ev->server_id= 0; // don't be ignored by slave SQL thread - if (unlikely(rli->relay_log.append(ev))) + DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); + rev->server_id= 0; // don't be ignored by slave SQL thread + if (unlikely(rli->relay_log.append(rev))) mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "failed to write a Rotate event" " to the relay log, SHOW SLAVE STATUS may be" " inaccurate"); + delete rev; + } + if (glev) + { + DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events")); + glev->server_id= 0; // don't be ignored by slave SQL thread + glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos + if (unlikely(rli->relay_log.append(glev))) + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, + ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "failed to write a Gtid_list event to the relay log, " + "gtid_slave_pos may be inaccurate"); + delete glev; + } + if (likely (rev || glev)) + { rli->relay_log.harvest_bytes_written(&rli->log_space_total); if (flush_master_info(mi, TRUE, TRUE)) sql_print_error("Failed to flush master info file"); - delete ev; } - else - mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, - ER(ER_SLAVE_CREATE_EVENT_FAILURE), - "Rotate_event (out of memory?)," - " SHOW SLAVE STATUS may be inaccurate"); } else mysql_mutex_unlock(log_lock); @@ -2337,7 +2373,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, { *suppress_warnings= TRUE; // Suppress reconnect warning } - else if (!check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (!check_io_slave_killed(mi, NULL)) { char buf[256]; my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql), @@ -2512,8 +2548,15 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, &my_charset_bin); mysql_mutex_lock(&mi->run_lock); if (full) - protocol->store(mi->rli.sql_thd ? mi->rli.sql_thd->get_proc_info() : "", + { + /* + Show what the sql driver replication thread is doing + This is only meaningful if there is only one slave thread. + */ + protocol->store(mi->rli.sql_driver_thd ? + mi->rli.sql_driver_thd->get_proc_info() : "", &my_charset_bin); + } protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin); mysql_mutex_unlock(&mi->run_lock); @@ -2844,8 +2887,8 @@ static int init_slave_thread(THD* thd, Master_info *mi, @retval True if the thread has been killed, false otherwise. */ template <typename killed_func, typename rpl_info> -static inline bool slave_sleep(THD *thd, time_t seconds, - killed_func func, rpl_info info) +static bool slave_sleep(THD *thd, time_t seconds, + killed_func func, rpl_info info) { bool ret; @@ -2859,7 +2902,7 @@ static inline bool slave_sleep(THD *thd, time_t seconds, mysql_mutex_lock(lock); thd->ENTER_COND(cond, lock, NULL, NULL); - while (! (ret= func(thd, info))) + while (! (ret= func(info))) { int error= mysql_cond_timedwait(cond, lock, &abstime); if (error == ETIMEDOUT || error == ETIME) @@ -3064,19 +3107,21 @@ static int has_temporary_error(THD *thd) @retval 2 No error calling ev->apply_event(), but error calling ev->update_pos(). */ -int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) +int apply_event_and_update_pos(Log_event* ev, THD* thd, + rpl_group_info *rgi, + rpl_parallel_thread *rpt) { int exec_res= 0; - + Relay_log_info* rli= rgi->rli; DBUG_ENTER("apply_event_and_update_pos"); DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)", ev->get_type_str(), ev->get_type_code(), ev->server_id)); - DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu", + DBUG_PRINT("info", ("thd->options: %s%s; rgi->last_event_start_time: %lu", FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->variables.option_bits, OPTION_BEGIN), - (ulong) rli->last_event_start_time)); + (ulong) rgi->last_event_start_time)); /* Execute the event to change the database and update the binary @@ -3117,15 +3162,21 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); ev->thd = thd; // because up to this point, ev->thd == 0 - int reason= ev->shall_skip(rli); + int reason= ev->shall_skip(rgi); if (reason == Log_event::EVENT_SKIP_COUNT) { DBUG_ASSERT(rli->slave_skip_counter > 0); rli->slave_skip_counter--; } mysql_mutex_unlock(&rli->data_lock); + DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event", + { + DBUG_ASSERT(!debug_sync_set_action + (thd, STRING_WITH_LEN("now WAIT_FOR continue"))); + DBUG_SET_INITIAL("-d,inject_slave_sql_before_apply_event"); + };); if (reason == Log_event::EVENT_SKIP_NOT) - exec_res= ev->apply_event(rli); + exec_res= ev->apply_event(rgi); #ifndef DBUG_OFF /* @@ -3141,9 +3192,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) // EVENT_SKIP_COUNT "skipped because event skip counter was non-zero" }; - DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d", + DBUG_PRINT("info", ("OPTION_BEGIN: %d IN_STMT: %d IN_TRANSACTION: %d", test(thd->variables.option_bits & OPTION_BEGIN), - rli->get_flag(Relay_log_info::IN_STMT))); + rli->get_flag(Relay_log_info::IN_STMT), + rli->get_flag(Relay_log_info::IN_TRANSACTION))); DBUG_PRINT("skip_event", ("%s event was %s", ev->get_type_str(), explain[reason])); #endif @@ -3151,7 +3203,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) DBUG_PRINT("info", ("apply_event error = %d", exec_res)); if (exec_res == 0) { - int error= ev->update_pos(rli); + int error= ev->update_pos(rgi); #ifdef HAVE_valgrind if (!rli->is_fake) #endif @@ -3187,12 +3239,94 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) DBUG_RETURN(2); } } + else + { + /* + Make sure we do not errorneously update gtid_slave_pos with a lingering + GTID from this failed event group (MDEV-4906). + */ + rgi->gtid_sub_id= 0; + } DBUG_RETURN(exec_res ? 1 : 0); } /** + Keep the relay log transaction state up to date. + + The state reflects how things are after the given event, that has just been + read from the relay log, is executed. + + This is only needed to ensure we: + - Don't abort the sql driver thread in the middle of an event group. + - Don't rotate the io thread in the middle of a statement or transaction. + The mechanism is that the io thread, when it needs to rotate the relay + log, will wait until the sql driver has read all the cached events + and then continue reading events one by one from the master until + the sql threads signals that log doesn't have an active group anymore. + + There are two possible cases. We keep them as 2 separate flags mainly + to make debugging easier. + + - IN_STMT is set when we have read an event that should be used + together with the next event. This is for example setting a + variable that is used when executing the next statement. + - IN_TRANSACTION is set when we are inside a BEGIN...COMMIT group + + To test the state one should use the is_in_group() function. +*/ + +inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) +{ + Log_event_type typ= ev->get_type_code(); + + /* check if we are in a multi part event */ + if (ev->is_part_of_group()) + rli->set_flag(Relay_log_info::IN_STMT); + else if (Log_event::is_group_event(typ)) + { + /* + If it was not a is_part_of_group() and not a group event (like + rotate) then we can reset the IN_STMT flag. We have the above + if only to allow us to have a rotate element anywhere. + */ + rli->clear_flag(Relay_log_info::IN_STMT); + } + + /* Check for an event that starts or stops a transaction */ + if (typ == QUERY_EVENT) + { + Query_log_event *qev= (Query_log_event*) ev; + /* + Trivial optimization to avoid the following somewhat expensive + checks. + */ + if (qev->q_len <= sizeof("ROLLBACK")) + { + if (qev->is_begin()) + rli->set_flag(Relay_log_info::IN_TRANSACTION); + if (qev->is_commit() || qev->is_rollback()) + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + } + } + if (typ == XID_EVENT) + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + if (typ == GTID_EVENT && + !(((Gtid_log_event*) ev)->flags2 & Gtid_log_event::FL_STANDALONE)) + { + /* This GTID_EVENT will generate a BEGIN event */ + rli->set_flag(Relay_log_info::IN_TRANSACTION); + } + + DBUG_PRINT("info", ("event: %u IN_STMT: %d IN_TRANSACTION: %d", + (uint) typ, + rli->get_flag(Relay_log_info::IN_STMT), + rli->get_flag(Relay_log_info::IN_TRANSACTION))); +} + + +/** Top-level function for executing the next event from the relay log. This function reads the event from the relay log, executes it, and @@ -3220,22 +3354,23 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) @retval 1 The event was not applied. */ -static int exec_relay_log_event(THD* thd, Relay_log_info* rli) + +static int exec_relay_log_event(THD* thd, Relay_log_info* rli, + rpl_group_info *serial_rgi) { + ulonglong event_size; DBUG_ENTER("exec_relay_log_event"); /* - We acquire this mutex since we need it for all operations except - event execution. But we will release it in places where we will - wait for something for example inside of next_event(). - */ + We acquire this mutex since we need it for all operations except + event execution. But we will release it in places where we will + wait for something for example inside of next_event(). + */ mysql_mutex_lock(&rli->data_lock); - Log_event * ev = next_event(rli); - - DBUG_ASSERT(rli->sql_thd==thd); + Log_event *ev= next_event(serial_rgi, &event_size); - if (sql_slave_killed(thd,rli)) + if (sql_slave_killed(serial_rgi)) { mysql_mutex_unlock(&rli->data_lock); delete ev; @@ -3244,6 +3379,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) if (ev) { int exec_res; + Log_event_type typ= ev->get_type_code(); /* This tests if the position of the beginning of the current event @@ -3257,8 +3393,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) sql_print_information("Slave SQL thread stopped because it reached its" " UNTIL position %s", llstr(rli->until_pos(), buf)); /* - Setting abort_slave flag because we do not want additional message about - error in query execution to be printed. + Setting abort_slave flag because we do not want additional + message about error in query execution to be printed. */ rli->abort_slave= 1; mysql_mutex_unlock(&rli->data_lock); @@ -3273,56 +3409,49 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) read hanging if the realy log does not have any more events. */ DBUG_EXECUTE_IF("incomplete_group_in_relay_log", - if ((ev->get_type_code() == XID_EVENT) || - ((ev->get_type_code() == QUERY_EVENT) && + if ((typ == XID_EVENT) || + ((typ == QUERY_EVENT) && strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) { DBUG_ASSERT(thd->transaction.all.modified_non_trans_table); rli->abort_slave= 1; mysql_mutex_unlock(&rli->data_lock); delete ev; - rli->inc_event_relay_log_pos(); + serial_rgi->inc_event_relay_log_pos(); DBUG_RETURN(0); };); } - exec_res= apply_event_and_update_pos(ev, thd, rli); + update_state_of_relay_log(rli, ev); - switch (ev->get_type_code()) { - case FORMAT_DESCRIPTION_EVENT: - /* - Format_description_log_event should not be deleted because it - will be used to read info about the relay log's format; - it will be deleted when the SQL thread does not need it, - i.e. when this thread terminates. - */ - break; - case ANNOTATE_ROWS_EVENT: - /* - Annotate_rows event should not be deleted because after it has - been applied, thd->query points to the string inside this event. - The thd->query will be used to generate new Annotate_rows event - during applying the subsequent Rows events. - */ - rli->set_annotate_event((Annotate_rows_log_event*) ev); - break; - case DELETE_ROWS_EVENT: - case UPDATE_ROWS_EVENT: - case WRITE_ROWS_EVENT: - /* - After the last Rows event has been applied, the saved Annotate_rows - event (if any) is not needed anymore and can be deleted. - */ - if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) - rli->free_annotate_event(); - /* fall through */ - default: - DBUG_PRINT("info", ("Deleting the event after it has been executed")); - if (!rli->is_deferred_event(ev)) - delete ev; - break; + /* + Execute queries in parallel, except if slave_skip_counter is set, + as it's is easier to skip queries in single threaded mode. + */ + + if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0) + DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, event_size)); + + /* + For GTID, allocate a new sub_id for the given domain_id. + The sub_id must be allocated in increasing order of binlog order. + */ + if (typ == GTID_EVENT && + event_group_new_gtid(serial_rgi, static_cast<Gtid_log_event *>(ev))) + { + sql_print_error("Error reading relay log event: %s", + "slave SQL thread aborted because of out-of-memory error"); + mysql_mutex_unlock(&rli->data_lock); + delete ev; + DBUG_RETURN(1); } + serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; + serial_rgi->event_relay_log_name= rli->event_relay_log_name; + serial_rgi->event_relay_log_pos= rli->event_relay_log_pos; + exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); + + delete_or_keep_event_post_apply(serial_rgi, typ, ev); /* update_log_pos failed: this should not happen, so we don't @@ -3345,14 +3474,16 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) Note, if lock wait timeout (innodb_lock_wait_timeout exceeded) there is no rollback since 5.0.13 (ref: manual). We have to not only seek but also - a) init_master_info(), to seek back to hot relay log's start for later - (for when we will come back to this hot log after re-processing the - possibly existing old logs where BEGIN is: check_binlog_magic() will - then need the cache to be at position 0 (see comments at beginning of + + a) init_master_info(), to seek back to hot relay log's start + for later (for when we will come back to this hot log after + re-processing the possibly existing old logs where BEGIN is: + check_binlog_magic() will then need the cache to be at + position 0 (see comments at beginning of init_master_info()). b) init_relay_log_pos(), because the BEGIN may be an older relay log. */ - if (rli->trans_retries < slave_trans_retries) + if (serial_rgi->trans_retries < slave_trans_retries) { if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL)) sql_print_error("Failed to initialize the master info structure"); @@ -3365,17 +3496,19 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) else { exec_res= 0; - rli->cleanup_context(thd, 1); + serial_rgi->cleanup_context(thd, 1); /* chance for concurrent connection to get more locks */ - slave_sleep(thd, MY_MIN(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE), - sql_slave_killed, rli); + slave_sleep(thd, MY_MIN(serial_rgi->trans_retries, + MAX_SLAVE_RETRY_PAUSE), + sql_slave_killed, serial_rgi); + serial_rgi->trans_retries++; mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS - rli->trans_retries++; rli->retried_trans++; statistic_increment(slave_retried_transactions, LOCK_status); mysql_mutex_unlock(&rli->data_lock); DBUG_PRINT("info", ("Slave retries transaction " - "rli->trans_retries: %lu", rli->trans_retries)); + "rgi->trans_retries: %lu", + serial_rgi->trans_retries)); } } else @@ -3394,11 +3527,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) event, the execution will proceed as usual; in the case of a non-transient error, the slave will stop with an error. */ - rli->trans_retries= 0; // restart from fresh - DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu", - rli->trans_retries)); + serial_rgi->trans_retries= 0; // restart from fresh + DBUG_PRINT("info", ("Resetting retry counter, rgi->trans_retries: %lu", + serial_rgi->trans_retries)); } } + thread_safe_increment64(&rli->executed_entries, + &slave_executed_entries_lock); DBUG_RETURN(exec_res); } mysql_mutex_unlock(&rli->data_lock); @@ -3416,9 +3551,9 @@ on this slave.\ } -static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) +static bool check_io_slave_killed(Master_info *mi, const char *info) { - if (io_slave_killed(thd, mi)) + if (io_slave_killed(mi)) { if (info && global_system_variables.log_warnings) sql_print_information("%s", info); @@ -3469,7 +3604,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, return 1; // Don't retry forever slave_sleep(thd, mi->connect_retry, io_slave_killed, mi); } - if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING])) + if (check_io_slave_killed(mi, messages[SLAVE_RECON_MSG_KILLED_WAITING])) return 1; thd->proc_info = messages[SLAVE_RECON_MSG_AFTER]; if (!suppress_warnings) @@ -3506,7 +3641,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, sql_print_information("%s", buf); } } - if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi)) + if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(mi)) { if (global_system_variables.log_warnings) sql_print_information("%s", messages[SLAVE_RECON_MSG_KILLED_AFTER]); @@ -3682,11 +3817,14 @@ connected: if (ret == 2) { - if (check_io_slave_killed(mi->io_thd, mi, "Slave I/O thread killed" + if (check_io_slave_killed(mi, "Slave I/O thread killed" "while calling get_master_version_and_clock(...)")) goto err; suppress_warnings= FALSE; - /* Try to reconnect because the error was caused by a transient network problem */ + /* + Try to reconnect because the error was caused by a transient network + problem + */ if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, reconnect_messages[SLAVE_RECON_ACT_REG])) goto err; @@ -3701,7 +3839,7 @@ connected: THD_STAGE_INFO(thd, stage_registering_slave_on_master); if (register_slave_on_master(mysql, mi, &suppress_warnings)) { - if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed " + if (!check_io_slave_killed(mi, "Slave I/O thread killed " "while registering slave on master")) { sql_print_error("Slave I/O thread couldn't register on master"); @@ -3726,13 +3864,13 @@ connected: } DBUG_PRINT("info",("Starting reading binary log from master")); - while (!io_slave_killed(thd,mi)) + while (!io_slave_killed(mi)) { THD_STAGE_INFO(thd, stage_requesting_binlog_dump); if (request_dump(thd, mysql, mi, &suppress_warnings)) { sql_print_error("Failed on request_dump()"); - if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ + if (check_io_slave_killed(mi, "Slave I/O thread killed while \ requesting master dump") || try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, reconnect_messages[SLAVE_RECON_ACT_DUMP])) @@ -3752,7 +3890,7 @@ requesting master dump") || const char *event_buf; DBUG_ASSERT(mi->last_error().number == 0); - while (!io_slave_killed(thd,mi)) + while (!io_slave_killed(mi)) { ulong event_len; /* @@ -3763,7 +3901,7 @@ requesting master dump") || */ THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); event_len= read_event(mysql, mi, &suppress_warnings); - if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ + if (check_io_slave_killed(mi, "Slave I/O thread killed while \ reading event")) goto err; DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT", @@ -3841,7 +3979,8 @@ Stopping slave I/O thread due to out-of-memory error from master"); goto err; } - if (flush_master_info(mi, TRUE, TRUE)) + if (mi->using_gtid != Master_info::USE_GTID_NO && + flush_master_info(mi, TRUE, TRUE)) { sql_print_error("Failed to flush master info file"); goto err; @@ -3853,10 +3992,11 @@ Stopping slave I/O thread due to out-of-memory error from master"); - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so the clean value is 0), then we are reading only one more event as we should, and we'll block only at the next event. No big deal. - - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so - the clean value is 1), then we are going into wait_for_relay_log_space() - for no reason, but this function will do a clean read, notice the clean - value and exit immediately. + - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just + after (so the clean value is 1), then we are going into + wait_for_relay_log_space() for no reason, but this function + will do a clean read, notice the clean value and exit + immediately. */ #ifndef DBUG_OFF { @@ -3917,6 +4057,8 @@ err: mi->mysql=0; } write_ignored_events_info_to_relay_log(thd, mi); + if (mi->using_gtid != Master_info::USE_GTID_NO) + flush_master_info(mi, TRUE, TRUE); THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); mysql_mutex_lock(&mi->run_lock); @@ -3944,6 +4086,9 @@ err_during_init: DBUG_LEAVE; // Must match DBUG_ENTER() my_thread_end(); +#ifdef HAVE_OPENSSL + ERR_remove_state(0); +#endif pthread_exit(0); return 0; // Avoid compiler warnings } @@ -4014,6 +4159,93 @@ end: } +void +slave_output_error_info(Relay_log_info *rli, THD *thd) +{ + /* + retrieve as much info as possible from the thd and, error + codes and warnings and print this to the error log as to + allow the user to locate the error + */ + uint32 const last_errno= rli->last_error().number; + char llbuff[22]; + + if (thd->is_error()) + { + char const *const errmsg= thd->get_stmt_da()->message(); + + DBUG_PRINT("info", + ("thd->get_stmt_da()->sql_errno()=%d; rli->last_error.number=%d", + thd->get_stmt_da()->sql_errno(), last_errno)); + if (last_errno == 0) + { + /* + This function is reporting an error which was not reported + while executing exec_relay_log_event(). + */ + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), "%s", errmsg); + } + else if (last_errno != thd->get_stmt_da()->sql_errno()) + { + /* + * An error was reported while executing exec_relay_log_event() + * however the error code differs from what is in the thread. + * This function prints out more information to help finding + * what caused the problem. + */ + sql_print_error("Slave (additional info): %s Error_code: %d", + errmsg, thd->get_stmt_da()->sql_errno()); + } + } + + /* Print any warnings issued */ + Diagnostics_area::Sql_condition_iterator it= + thd->get_stmt_da()->sql_conditions(); + const Sql_condition *err; + /* + Added controlled slave thread cancel for replication + of user-defined variables. + */ + bool udf_error = false; + while ((err= it++)) + { + if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY) + udf_error = true; + sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); + } + if (udf_error) + { + String tmp; + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_error("Error loading user-defined library, slave SQL " + "thread aborted. Install the missing library, and restart the " + "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " + "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, + llbuff), tmp.c_ptr_safe()); + } + else + { + String tmp; + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_error("\ +Error running query, slave SQL thread aborted. Fix the problem, and restart \ +the slave SQL thread with \"SLAVE START\". We stopped at log \ +'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff), + tmp.c_ptr_safe()); + } +} + + /** Slave SQL thread entry point. @@ -4034,6 +4266,7 @@ pthread_handler_t handle_slave_sql(void *arg) Master_info *mi= ((Master_info*)arg); Relay_log_info* rli = &mi->rli; const char *errmsg; + rpl_group_info *serial_rgi; // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); @@ -4042,6 +4275,7 @@ pthread_handler_t handle_slave_sql(void *arg) LINT_INIT(saved_master_log_pos); LINT_INIT(saved_log_pos); + serial_rgi= new rpl_group_info(rli); thd = new THD; // note that contructor of THD uses DBUG_ ! thd->thread_stack = (char*)&thd; // remember where our stack is thd->rpl_filter = mi->rpl_filter; @@ -4055,7 +4289,15 @@ pthread_handler_t handle_slave_sql(void *arg) rli->events_till_abort = abort_slave_event_count; #endif - rli->sql_thd= thd; + /* + THD for the sql driver thd. In parallel replication this is the thread + that reads things from the relay log and calls rpl_parallel::do_event() + to execute queries. + + In single thread replication this is the THD for the thread that is + executing SQL queries too. + */ + serial_rgi->thd= rli->sql_driver_thd= thd; /* Inform waiting threads that slave has started */ rli->slave_run_id++; @@ -4074,14 +4316,12 @@ pthread_handler_t handle_slave_sql(void *arg) goto err_during_init; } thd->init_for_queries(); - thd->rli_slave= rli; - if ((rli->deferred_events_collecting= mi->rpl_filter->is_on())) + thd->rgi_slave= serial_rgi; + if ((serial_rgi->deferred_events_collecting= mi->rpl_filter->is_on())) { - rli->deferred_events= new Deferred_log_events(rli); + serial_rgi->deferred_events= new Deferred_log_events(rli); } - thd->temporary_tables = rli->save_temporary_tables; // restore temp tables - set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables /* binlog_annotate_row_events must be TRUE only after an Annotate_rows event has been recieved and only till the last corresponding rbr event has been @@ -4114,14 +4354,14 @@ pthread_handler_t handle_slave_sql(void *arg) But the master timestamp is reset by RESET SLAVE & CHANGE MASTER. */ rli->clear_error(); + rli->parallel.reset(); //tell the I/O thread to take relay_log_space_limit into account from now on mysql_mutex_lock(&rli->log_space_lock); rli->ignore_log_space_limit= 0; mysql_mutex_unlock(&rli->log_space_lock); - rli->trans_retries= 0; // start from "no error" - DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries)); + serial_rgi->gtid_sub_id= 0; if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, @@ -4132,6 +4372,7 @@ pthread_handler_t handle_slave_sql(void *arg) "Error initializing relay log position: %s", errmsg); goto err; } + strcpy(rli->future_event_master_log_name, rli->group_master_log_name); THD_CHECK_SENTRY(thd); #ifndef DBUG_OFF { @@ -4157,7 +4398,6 @@ pthread_handler_t handle_slave_sql(void *arg) #endif } #endif - DBUG_ASSERT(rli->sql_thd == thd); #ifdef WITH_WSREP thd->wsrep_exec_mode= LOCAL_STATE; @@ -4246,10 +4486,9 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, /* Read queries from the IO/THREAD until this thread is killed */ - while (!sql_slave_killed(thd,rli)) + while (!sql_slave_killed(serial_rgi)) { THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log); - DBUG_ASSERT(rli->sql_thd == thd); THD_CHECK_SENTRY(thd); if (saved_skip && rli->slave_skip_counter == 0) @@ -4266,98 +4505,19 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, saved_skip= 0; } - if (exec_relay_log_event(thd,rli)) + if (exec_relay_log_event(thd, rli, serial_rgi)) { DBUG_PRINT("info", ("exec_relay_log_event() failed")); // do not scare the user if SQL thread was simply killed or stopped - if (!sql_slave_killed(thd,rli)) - { - /* - retrieve as much info as possible from the thd and, error - codes and warnings and print this to the error log as to - allow the user to locate the error - */ - uint32 const last_errno= rli->last_error().number; - - if (thd->is_error()) - { - char const *const errmsg= thd->get_stmt_da()->message(); - - DBUG_PRINT("info", - ("thd->get_stmt_da()->sql_errno()=%d; rli->last_error.number=%d", - thd->get_stmt_da()->sql_errno(), last_errno)); - if (last_errno == 0) - { - /* - This function is reporting an error which was not reported - while executing exec_relay_log_event(). - */ - rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), "%s", errmsg); - } - else if (last_errno != thd->get_stmt_da()->sql_errno()) - { - /* - * An error was reported while executing exec_relay_log_event() - * however the error code differs from what is in the thread. - * This function prints out more information to help finding - * what caused the problem. - */ - sql_print_error("Slave (additional info): %s Error_code: %d", - errmsg, thd->get_stmt_da()->sql_errno()); - } - } - - /* Print any warnings issued */ - Diagnostics_area::Sql_condition_iterator it= - thd->get_stmt_da()->sql_conditions(); - const Sql_condition *err; - /* - Added controlled slave thread cancel for replication - of user-defined variables. - */ - bool udf_error = false; - while ((err= it++)) - { - if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY) - udf_error = true; - sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); - } - if (udf_error) - { - String tmp; - if (mi->using_gtid != Master_info::USE_GTID_NO) - { - tmp.append(STRING_WITH_LEN("; GTID position '")); - rpl_append_gtid_state(&tmp, false); - tmp.append(STRING_WITH_LEN("'")); - } - sql_print_error("Error loading user-defined library, slave SQL " - "thread aborted. Install the missing library, and restart the " - "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " - "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, - llbuff), tmp.c_ptr_safe()); - } - else - { - String tmp; - if (mi->using_gtid != Master_info::USE_GTID_NO) - { - tmp.append(STRING_WITH_LEN("; GTID position '")); - rpl_append_gtid_state(&tmp, false); - tmp.append(STRING_WITH_LEN("'")); - } - sql_print_error("\ -Error running query, slave SQL thread aborted. Fix the problem, and restart \ -the slave SQL thread with \"SLAVE START\". We stopped at log \ -'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff), - tmp.c_ptr_safe()); - } - } + if (!sql_slave_killed(serial_rgi)) + slave_output_error_info(rli, thd); goto err; } - rli->executed_entries++; } + if (opt_slave_parallel_threads > 0) + rli->parallel.wait_for_done(); + /* Thread stopped. Print the current replication position to the log */ { String tmp; @@ -4377,13 +4537,21 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ err: /* + Once again, in case we aborted with an error and skipped the first one. + (We want the first one to be before the printout of stop position to + get the correct position printed.) + */ + if (opt_slave_parallel_threads > 0) + rli->parallel.wait_for_done(); + + /* Some events set some playgrounds, which won't be cleared because thread stops. Stopping of this thread may not be known to these events ("stop" request is detected only by the present function, not by events), so we must "proactively" clear playgrounds: */ thd->clear_error(); - rli->cleanup_context(thd, 1); + serial_rgi->cleanup_context(thd, 1); /* Some extra safety, which should not been needed (normally, event deletion should already have done these assignments (each event which sets these @@ -4392,6 +4560,8 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ thd->catalog= 0; thd->reset_query(); thd->reset_db(NULL, 0); + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + flush_relay_log_info(rli); THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); mysql_mutex_lock(&rli->run_lock); err_during_init: @@ -4410,19 +4580,18 @@ err_during_init: rli->ignore_log_space_limit= 0; /* don't need any lock */ /* we die so won't remember charset - re-update them on next thread start */ rli->cached_charset_invalidate(); - rli->save_temporary_tables = thd->temporary_tables; /* TODO: see if we can do this conditionally in next_event() instead to avoid unneeded position re-init */ thd->temporary_tables = 0; // remove tempation from destructor to close them - DBUG_ASSERT(rli->sql_thd == thd); THD_CHECK_SENTRY(thd); - rli->sql_thd= 0; - set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables + serial_rgi->thd= rli->sql_driver_thd= 0; mysql_mutex_lock(&LOCK_thread_count); THD_CHECK_SENTRY(thd); + thd->rgi_fake= thd->rgi_slave= NULL; + delete serial_rgi; delete thd; mysql_mutex_unlock(&LOCK_thread_count); /* @@ -4436,6 +4605,9 @@ err_during_init: DBUG_LEAVE; // Must match DBUG_ENTER() my_thread_end(); +#ifdef HAVE_OPENSSL + ERR_remove_state(0); +#endif pthread_exit(0); return 0; // Avoid compiler warnings } @@ -4848,6 +5020,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ulong s_id; bool unlock_data_lock= TRUE; bool gtid_skip_enqueue= false; + bool got_gtid_event= false; + rpl_gtid event_gtid; /* FD_q must have been prepared for the first R_a event @@ -4911,8 +5085,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) goto err; } - LINT_INIT(inc_pos); - if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) DBUG_RETURN(queue_old_event(mi,buf,event_len)); @@ -5164,8 +5336,17 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) case GTID_EVENT: { - uchar dummy_flag; + uchar gtid_flag; + if (Gtid_log_event::peek(buf, event_len, checksum_alg, + &event_gtid.domain_id, &event_gtid.server_id, + &event_gtid.seq_no, >id_flag, + rli->relay_log.description_event_for_queue)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + got_gtid_event= true; if (mi->using_gtid == Master_info::USE_GTID_NO) goto default_action; if (unlikely(!mi->gtid_event_seen)) @@ -5173,8 +5354,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->gtid_event_seen= true; if (mi->gtid_reconnect_event_skip_count) { - rpl_gtid gtid; - /* If we are reconnecting, and we need to skip a partial event group already queued to the relay log before the reconnect, then we check @@ -5183,21 +5362,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) The only way we should be able to receive a different GTID than what we expect is if the binlog on the master (or more likely the whole - master server) was replaced with a different one, one the same IP + master server) was replaced with a different one, on the same IP address, _and_ the new master happens to have domains in a different order so we get the GTID from a different domain first. Still, it is best to protect against this case. */ - if (Gtid_log_event::peek(buf, event_len, checksum_alg, - >id.domain_id, >id.server_id, - >id.seq_no, &dummy_flag)) - { - error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; - goto err; - } - if (gtid.domain_id != mi->last_queued_gtid.domain_id || - gtid.server_id != mi->last_queued_gtid.server_id || - gtid.seq_no != mi->last_queued_gtid.seq_no) + if (event_gtid.domain_id != mi->last_queued_gtid.domain_id || + event_gtid.server_id != mi->last_queued_gtid.server_id || + event_gtid.seq_no != mi->last_queued_gtid.seq_no) { bool first; error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH; @@ -5207,7 +5379,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) &first); error_msg.append(STRING_WITH_LEN(", received: ")); first= true; - rpl_slave_state_tostring_helper(&error_msg, >id, &first); + rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first); goto err; } } @@ -5221,21 +5393,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) /* We have successfully queued to relay log everything before this GTID, so in case of reconnect we can start from after any previous GTID. + (Normally we would have updated gtid_current_pos earlier at the end of + the previous event group, but better leave an extra check here for + safety). */ if (mi->events_queued_since_last_gtid) { mi->gtid_current_pos.update(&mi->last_queued_gtid); mi->events_queued_since_last_gtid= 0; } - if (Gtid_log_event::peek(buf, event_len, checksum_alg, - &mi->last_queued_gtid.domain_id, - &mi->last_queued_gtid.server_id, - &mi->last_queued_gtid.seq_no, &dummy_flag)) - { - error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; - goto err; - } + mi->last_queued_gtid= event_gtid; + mi->last_queued_gtid_standalone= + (gtid_flag & Gtid_log_event::FL_STANDALONE) != 0; ++mi->events_queued_since_last_gtid; + inc_pos= event_len; } break; @@ -5287,6 +5458,49 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_lock(log_lock); s_id= uint4korr(buf + SERVER_ID_OFFSET); + /* + Write the event to the relay log, unless we reconnected in the middle + of an event group and now need to skip the initial part of the group that + we already wrote before reconnecting. + */ + if (unlikely(gtid_skip_enqueue)) + { + mi->master_log_pos+= inc_pos; + if ((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT && + s_id == mi->master_id) + { + /* + If we write this master's description event in the middle of an event + group due to GTID reconnect, SQL thread will think that master crashed + in the middle of the group and roll back the first half, so we must not. + + But we still have to write an artificial copy of the masters description + event, to override the initial slave-version description event so that + SQL thread has the right information for parsing the events it reads. + */ + rli->relay_log.description_event_for_queue->created= 0; + rli->relay_log.description_event_for_queue->set_artificial_event(); + if (rli->relay_log.append_no_lock + (rli->relay_log.description_event_for_queue)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + else if (mi->gtid_reconnect_event_skip_count == 0) + { + /* + Add a fake rotate event so that SQL thread can see the old-style + position where we re-connected in the middle of a GTID event group. + */ + Rotate_log_event fake_rev(mi->master_log_name, 0, mi->master_log_pos, 0); + fake_rev.server_id= mi->master_id; + if (rli->relay_log.append_no_lock(&fake_rev)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + } + else if ((s_id == global_system_variables.server_id && !mi->rli.replicate_same_server_id) || /* @@ -5329,6 +5543,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); DBUG_ASSERT(rli->ign_master_log_name_end[0]); rli->ign_master_log_pos_end= mi->master_log_pos; + if (got_gtid_event) + rli->ign_gtids.update(&event_gtid); } rli->relay_log.signal_update(); // the slave SQL thread needs to re-check DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", @@ -5336,16 +5552,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } else { - /* - Write the event to the relay log, unless we reconnected in the middle - of an event group and now need to skip the initial part of the group that - we already wrote before reconnecting. - */ - if (unlikely(gtid_skip_enqueue)) - { - mi->master_log_pos+= inc_pos; - } - else if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) + if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) { mi->master_log_pos+= inc_pos; DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); @@ -5356,11 +5563,33 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; } rli->ign_master_log_name_end[0]= 0; // last event is not ignored + if (got_gtid_event) + rli->ign_gtids.remove_if_present(&event_gtid); if (save_buf != NULL) buf= save_buf; } mysql_mutex_unlock(log_lock); + if (!error && + mi->using_gtid != Master_info::USE_GTID_NO && + mi->events_queued_since_last_gtid > 0 && + ( (mi->last_queued_gtid_standalone && + !Log_event::is_part_of_group((Log_event_type)(uchar) + buf[EVENT_TYPE_OFFSET])) || + (!mi->last_queued_gtid_standalone && + ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + Query_log_event::peek_is_commit_rollback(buf, event_len, + checksum_alg)))))) + { + /* + The whole of the current event group is queued. So in case of + reconnect we can start from after the current GTID. + */ + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + } + skip_relay_logging: err: @@ -5520,7 +5749,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, "terminated."); DBUG_RETURN(1); } - while (!(slave_was_killed = io_slave_killed(thd,mi)) && + while (!(slave_was_killed = io_slave_killed(mi)) && (reconnect ? mysql_reconnect(mysql) != 0 : mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, mi->port, 0, client_flag) == 0)) @@ -5598,19 +5827,20 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, } +#ifdef NOT_USED MYSQL *rpl_connect_master(MYSQL *mysql) { - THD *thd= current_thd; Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO); bool allocated= false; my_bool my_true= 1; + THD *thd; if (!mi) { sql_print_error("'rpl_connect_master' must be called in slave I/O thread context."); return NULL; } - + thd= mi->io_thd; if (!mysql) { if(!(mysql= mysql_init(NULL))) @@ -5653,11 +5883,11 @@ MYSQL *rpl_connect_master(MYSQL *mysql) if (mi->user == NULL || mi->user[0] == 0 - || io_slave_killed(thd, mi) + || io_slave_killed( mi) || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, mi->port, 0, 0)) { - if (!io_slave_killed(thd, mi)) + if (!io_slave_killed( mi)) sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)", mysql_error(mysql), mysql_errno(mysql)); @@ -5667,6 +5897,7 @@ MYSQL *rpl_connect_master(MYSQL *mysql) } return mysql; } +#endif /* Store the file and position where the execute-slave thread are in the @@ -5772,17 +6003,21 @@ static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg) @return The event read, or NULL on error. If an error occurs, the error is reported through the sql_print_information() or sql_print_error() functions. + + The size of the read event (in bytes) is returned in *event_size. */ -static Log_event* next_event(Relay_log_info* rli) +static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) { Log_event* ev; + Relay_log_info *rli= rgi->rli; IO_CACHE* cur_log = rli->cur_log; mysql_mutex_t *log_lock = rli->relay_log.get_log_lock(); const char* errmsg=0; - THD* thd = rli->sql_thd; + THD *thd = rgi->thd; DBUG_ENTER("next_event"); - DBUG_ASSERT(thd != 0); + DBUG_ASSERT(thd != 0 && thd == rli->sql_driver_thd); + *event_size= 0; #ifndef DBUG_OFF if (abort_slave_event_count && !rli->events_till_abort--) @@ -5798,7 +6033,7 @@ static Log_event* next_event(Relay_log_info* rli) */ mysql_mutex_assert_owner(&rli->data_lock); - while (!sql_slave_killed(thd,rli)) + while (!sql_slave_killed(rgi)) { /* We can have two kinds of log reading: @@ -5811,6 +6046,7 @@ static Log_event* next_event(Relay_log_info* rli) The other case is much simpler: We just have a read only log that nobody else will be updating. */ + ulonglong old_pos; bool hot_log; if ((hot_log = (cur_log != &rli->cache_buf))) { @@ -5846,7 +6082,8 @@ static Log_event* next_event(Relay_log_info* rli) llstr(my_b_tell(cur_log),llbuf1), llstr(rli->event_relay_log_pos,llbuf2))); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); + DBUG_ASSERT(opt_slave_parallel_threads > 0 || + my_b_tell(cur_log) == rli->event_relay_log_pos); } #endif /* @@ -5861,43 +6098,23 @@ static Log_event* next_event(Relay_log_info* rli) But if the relay log is created by new_file(): then the solution is: MYSQL_BIN_LOG::open() will write the buffered description event. */ + old_pos= rli->event_relay_log_pos; if ((ev= Log_event::read_log_event(cur_log,0, rli->relay_log.description_event_for_exec, opt_slave_sql_verify_checksum))) { - DBUG_ASSERT(thd==rli->sql_thd); /* read it while we have a lock, to avoid a mutex lock in inc_event_relay_log_pos() */ rli->future_event_relay_log_pos= my_b_tell(cur_log); - /* - For GTID, allocate a new sub_id for the given domain_id. - The sub_id must be allocated in increasing order of binlog order. - */ - if (ev->get_type_code() == GTID_EVENT) - { - Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev); - uint64 sub_id= rpl_global_gtid_slave_state.next_sub_id(gev->domain_id); - if (!sub_id) - { - errmsg = "slave SQL thread aborted because of out-of-memory error"; - if (hot_log) - mysql_mutex_unlock(log_lock); - goto err; - } - rli->gtid_sub_id= sub_id; - rli->current_gtid.server_id= gev->server_id; - rli->current_gtid.domain_id= gev->domain_id; - rli->current_gtid.seq_no= gev->seq_no; - } + *event_size= rli->future_event_relay_log_pos - old_pos; if (hot_log) mysql_mutex_unlock(log_lock); DBUG_RETURN(ev); } - DBUG_ASSERT(thd==rli->sql_thd); if (opt_reckless_slave) // For mysql-test cur_log->error = 0; if (cur_log->error < 0) @@ -5962,6 +6179,25 @@ static Log_event* next_event(Relay_log_info* rli) DBUG_RETURN(ev); } + if (rli->ign_gtids.count()) + { + /* We generate and return a Gtid_list, to update gtid_slave_pos. */ + DBUG_PRINT("info",("seeing ignored end gtids")); + ev= new Gtid_list_log_event(&rli->ign_gtids, + Gtid_list_log_event::FLAG_IGN_GTIDS); + rli->ign_gtids.reset(); + mysql_mutex_unlock(log_lock); + if (unlikely(!ev)) + { + errmsg= "Slave SQL thread failed to create a Gtid_list event " + "(out of memory?), gtid_slave_pos may be inaccurate"; + goto err; + } + ev->server_id= 0; // don't be ignored by slave SQL thread + ev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos + DBUG_RETURN(ev); + } + /* We can, and should release data_lock while we are waiting for update. If we do not, show slave status will block @@ -5985,14 +6221,15 @@ static Log_event* next_event(Relay_log_info* rli) and reads one more event and starts honoring log_space_limit again. If the SQL thread needs more events to be able to rotate the log (it - might need to finish the current group first), then it can ask for one - more at a time. Thus we don't outgrow the relay log indefinitely, + might need to finish the current group first), then it can ask for + one more at a time. Thus we don't outgrow the relay log indefinitely, but rather in a controlled manner, until the next rotate. When the SQL thread starts it sets ignore_log_space_limit to false. We should also reset ignore_log_space_limit to 0 when the user does - RESET SLAVE, but in fact, no need as RESET SLAVE requires that the slave - be stopped, and the SQL thread sets ignore_log_space_limit to 0 when + RESET SLAVE, but in fact, no need as RESET SLAVE requires that the + slave be stopped, and the SQL thread sets ignore_log_space_limit + to 0 when it stops. */ mysql_mutex_lock(&rli->log_space_lock); @@ -6030,7 +6267,7 @@ static Log_event* next_event(Relay_log_info* rli) mysql_mutex_unlock(&rli->log_space_lock); mysql_cond_broadcast(&rli->log_space_cond); // Note that wait_for_update_relay_log unlocks lock_log ! - rli->relay_log.wait_for_update_relay_log(rli->sql_thd); + rli->relay_log.wait_for_update_relay_log(rli->sql_driver_thd); // re-acquire data lock since we released it earlier mysql_mutex_lock(&rli->data_lock); rli->last_master_timestamp= save_timestamp; @@ -6363,10 +6600,10 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report, */ bool rpl_master_erroneous_autoinc(THD *thd) { - if (thd->rli_slave) + if (thd->rgi_slave) { DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;); - return rpl_master_has_bug(thd->rli_slave, 33029, FALSE, NULL, NULL); + return rpl_master_has_bug(thd->rgi_slave->rli, 33029, FALSE, NULL, NULL); } return FALSE; } |