diff options
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 1081 |
1 files changed, 883 insertions, 198 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 52e60c237dc..9e20775a1aa 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -11,9 +11,10 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + along with this program; if not, write to the Free Software Foundation, + 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ +#include <my_global.h> #include "sql_priv.h" #include "unireg.h" // HAVE_* #include "rpl_mi.h" @@ -32,6 +33,15 @@ static int count_relay_log_space(Relay_log_info* rli); +/** + Current replication state (hash of last GTID executed, per replication + domain). +*/ +rpl_slave_state *rpl_global_gtid_slave_state; +/* Object used for MASTER_GTID_WAIT(). */ +gtid_waiting rpl_global_gtid_waiting; + + // Defined in slave.cc int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, @@ -42,24 +52,24 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id), info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), sync_counter(0), is_relay_log_recovery(is_slave_recovery), - save_temporary_tables(0), cur_log_old_open_count(0), - error_on_rli_init_info(false), group_relay_log_pos(0), - event_relay_log_pos(0), + save_temporary_tables(0), mi(0), + inuse_relaylog_list(0), last_inuse_relaylog(0), + cur_log_old_open_count(0), error_on_rli_init_info(false), + group_relay_log_pos(0), event_relay_log_pos(0), #if HAVE_valgrind is_fake(FALSE), #endif group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), - last_master_timestamp(0), slave_skip_counter(0), - abort_pos_wait(0), slave_run_id(0), sql_thd(0), - inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), - until_log_pos(0), retried_trans(0), - tables_to_lock(0), tables_to_lock_count(0), - last_event_start_time(0), deferred_events(NULL),m_flags(0), - row_stmt_start_timestamp(0), long_find_row_note_printed(false), - m_annotate_event(0) + last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), + abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), + gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), + slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE), + until_log_pos(0), retried_trans(0), executed_entries(0), + m_flags(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); + relay_log.is_relay_log= TRUE; #ifdef HAVE_PSI_INTERFACE relay_log.set_psi_keys(key_RELAYLOG_LOCK_index, key_RELAYLOG_update_cond, @@ -71,20 +81,18 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0; until_log_name[0]= ign_master_log_name_end[0]= 0; + max_relay_log_size= global_system_variables.max_relay_log_size; bzero((char*) &info_file, sizeof(info_file)); bzero((char*) &cache_buf, sizeof(cache_buf)); - cached_charset_invalidate(); mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_relay_log_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_relay_log_info_log_space_lock, &log_space_lock, MY_MUTEX_INIT_FAST); - mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL); mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); - mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL); relay_log.init_pthread_objects(); DBUG_VOID_RETURN; } @@ -94,17 +102,15 @@ Relay_log_info::~Relay_log_info() { DBUG_ENTER("Relay_log_info::~Relay_log_info"); + reset_inuse_relaylog(); mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&log_space_lock); - mysql_mutex_destroy(&sleep_lock); mysql_cond_destroy(&data_cond); mysql_cond_destroy(&start_cond); mysql_cond_destroy(&stop_cond); mysql_cond_destroy(&log_space_cond); - mysql_cond_destroy(&sleep_cond); relay_log.cleanup(); - free_annotate_event(); DBUG_VOID_RETURN; } @@ -131,8 +137,6 @@ int init_relay_log_info(Relay_log_info* rli, rli->abort_pos_wait=0; rli->log_space_limit= relay_log_space_limit; rli->log_space_total= 0; - rli->tables_to_lock= 0; - rli->tables_to_lock_count= 0; char pattern[FN_REFLEN]; (void) my_realpath(pattern, slave_load_tmpdir, 0); @@ -153,15 +157,6 @@ int init_relay_log_info(Relay_log_info* rli, event, in flush_master_info(mi, 1, ?). */ - /* - For the maximum log size, we choose max_relay_log_size if it is - non-zero, max_binlog_size otherwise. If later the user does SET - GLOBAL on one of these variables, fix_max_binlog_size and - fix_max_relay_log_size will reconsider the choice (for example - if the user changes max_relay_log_size to zero, we have to - switch to using max_binlog_size for the relay log) and update - rli->relay_log.max_size (and mysql_bin_log.max_size). - */ { /* Reports an error and returns, if the --relay-log's path is a directory.*/ @@ -211,21 +206,42 @@ a file name for --relay-log-index option", opt_relaylog_index_name); name_warning_sent= 1; } - rli->relay_log.is_relay_log= TRUE; + /* For multimaster, add connection name to relay log filenames */ + Master_info* mi= rli->mi; + char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN]; + char *buf_relaylog_index_name= opt_relaylog_index_name; + mysql_mutex_t *log_lock; + + create_logfile_name_with_suffix(buf_relay_logname, + sizeof(buf_relay_logname), + ln, 1, &mi->cmp_connection_name); + ln= buf_relay_logname; + + if (opt_relaylog_index_name) + { + buf_relaylog_index_name= buf_relaylog_index_name_buff; + create_logfile_name_with_suffix(buf_relaylog_index_name_buff, + sizeof(buf_relaylog_index_name_buff), + opt_relaylog_index_name, 0, + &mi->cmp_connection_name); + } /* note, that if open() fails, we'll still have index file open but a destructor will take care of that */ - if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE) || - rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0, - (max_relay_log_size ? max_relay_log_size : - max_binlog_size), 1, TRUE)) + log_lock= rli->relay_log.get_log_lock(); + mysql_mutex_lock(log_lock); + if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) || + rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, + mi->rli.max_relay_log_size, 1, TRUE)) { + mysql_mutex_unlock(log_lock); mysql_mutex_unlock(&rli->data_lock); - sql_print_error("Failed in open_log() called from init_relay_log_info()"); + sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno); DBUG_RETURN(1); } + mysql_mutex_unlock(log_lock); } /* if file does not exist */ @@ -242,7 +258,7 @@ a file name for --relay-log-index option", opt_relaylog_index_name); { sql_print_error("Failed to create a new relay log info file (\ file '%s', errno %d)", fname, my_errno); - msg= current_thd->stmt_da->message(); + msg= current_thd->get_stmt_da()->message(); goto err; } if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, @@ -250,7 +266,7 @@ file '%s', errno %d)", fname, my_errno); { sql_print_error("Failed to create a cache on relay log info file '%s'", fname); - msg= current_thd->stmt_da->message(); + msg= current_thd->get_stmt_da()->message(); goto err; } @@ -299,20 +315,80 @@ Failed to open the existing relay log info file '%s' (errno %d)", } rli->info_fd = info_fd; - int relay_log_pos, master_log_pos; + int relay_log_pos, master_log_pos, lines; + char *first_non_digit; + /* + In MySQL 5.6, there is a MASTER_DELAY option to CHANGE MASTER. This is + not yet merged into MariaDB (as of 10.0.13). However, we detect the + presense of the new option in relay-log.info, as a placeholder for + possible later merge of the feature, and to maintain file format + compatibility with MySQL 5.6+. + */ + int dummy_sql_delay; + + /* + Starting from MySQL 5.6.x, relay-log.info has a new format. + Now, its first line contains the number of lines in the file. + By reading this number we can determine which version our master.info + comes from. We can't simply count the lines in the file, since + versions before 5.6.x could generate files with more lines than + needed. If first line doesn't contain a number, or if it + contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY, + then the file is treated like a file from pre-5.6.x version. + There is no ambiguity when reading an old master.info: before + 5.6.x, the first line contained the binlog's name, which is + either empty or has an extension (contains a '.'), so can't be + confused with an integer. + + So we're just reading first line and trying to figure which + version is this. + */ + + /* + The first row is temporarily stored in mi->master_log_name, if + it is line count and not binlog name (new format) it will be + overwritten by the second row later. + */ if (init_strvar_from_file(rli->group_relay_log_name, sizeof(rli->group_relay_log_name), + &rli->info_file, "")) + { + msg="Error reading slave log configuration"; + goto err; + } + + lines= strtoul(rli->group_relay_log_name, &first_non_digit, 10); + + if (rli->group_relay_log_name[0] != '\0' && + *first_non_digit == '\0' && + lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) + { + DBUG_PRINT("info", ("relay_log_info file is in new format.")); + /* Seems to be new format => read relay log name from next line */ + if (init_strvar_from_file(rli->group_relay_log_name, + sizeof(rli->group_relay_log_name), + &rli->info_file, "")) + { + msg="Error reading slave log configuration"; + goto err; + } + } + else + DBUG_PRINT("info", ("relay_log_info file is in old format.")); + + if (init_intvar_from_file(&relay_log_pos, + &rli->info_file, BIN_LOG_HEADER_SIZE) || + init_strvar_from_file(rli->group_master_log_name, + sizeof(rli->group_master_log_name), &rli->info_file, "") || - init_intvar_from_file(&relay_log_pos, - &rli->info_file, BIN_LOG_HEADER_SIZE) || - init_strvar_from_file(rli->group_master_log_name, - sizeof(rli->group_master_log_name), - &rli->info_file, "") || - init_intvar_from_file(&master_log_pos, &rli->info_file, 0)) + init_intvar_from_file(&master_log_pos, &rli->info_file, 0) || + (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY && + init_intvar_from_file(&dummy_sql_delay, &rli->info_file, 0))) { msg="Error reading slave log configuration"; goto err; } + strmake_buf(rli->event_relay_log_name,rli->group_relay_log_name); rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos; rli->group_master_log_pos= master_log_pos; @@ -320,6 +396,7 @@ Failed to open the existing relay log info file '%s' (errno %d)", if (rli->is_relay_log_recovery && init_recovery(rli->mi, &msg)) goto err; + rli->relay_log_state.load(rpl_global_gtid_slave_state); if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, @@ -363,7 +440,7 @@ Failed to open the existing relay log info file '%s' (errno %d)", rli->inited= 1; rli->error_on_rli_init_info= false; mysql_mutex_unlock(&rli->data_lock); - DBUG_RETURN(error); + DBUG_RETURN(0); err: rli->error_on_rli_init_info= true; @@ -444,6 +521,90 @@ void Relay_log_info::clear_until_condition() /* + Read the correct format description event for starting to replicate from + a given position in a relay log file. +*/ +Format_description_log_event * +read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, + const char **errmsg) +{ + Log_event *ev; + Format_description_log_event *fdev; + bool found= false; + + /* + By default the relay log is in binlog format 3 (4.0). + Even if format is 4, this will work enough to read the first event + (Format_desc) (remember that format 4 is just lenghtened compared to format + 3; format 3 is a prefix of format 4). + */ + fdev= new Format_description_log_event(3); + + while (!found) + { + Log_event_type typ; + + /* + Read the possible Format_description_log_event; if position + was 4, no need, it will be read naturally. + */ + DBUG_PRINT("info",("looking for a Format_description_log_event")); + + if (my_b_tell(cur_log) >= start_pos) + break; + + if (!(ev= Log_event::read_log_event(cur_log, 0, fdev, + opt_slave_sql_verify_checksum))) + { + DBUG_PRINT("info",("could not read event, cur_log->error=%d", + cur_log->error)); + if (cur_log->error) /* not EOF */ + { + *errmsg= "I/O error reading event at position 4"; + delete fdev; + return NULL; + } + break; + } + typ= ev->get_type_code(); + if (typ == FORMAT_DESCRIPTION_EVENT) + { + DBUG_PRINT("info",("found Format_description_log_event")); + delete fdev; + fdev= (Format_description_log_event*) ev; + /* + As ev was returned by read_log_event, it has passed is_valid(), so + my_malloc() in ctor worked, no need to check again. + */ + /* + Ok, we found a Format_description event. But it is not sure that this + describes the whole relay log; indeed, one can have this sequence + (starting from position 4): + Format_desc (of slave) + Rotate (of master) + Format_desc (of master) + So the Format_desc which really describes the rest of the relay log + is the 3rd event (it can't be further than that, because we rotate + the relay log when we queue a Rotate event from the master). + But what describes the Rotate is the first Format_desc. + So what we do is: + go on searching for Format_description events, until you exceed the + position (argument 'pos') or until you find another event than Rotate + or Format_desc. + */ + } + else + { + DBUG_PRINT("info",("found event of another type=%d", typ)); + found= (typ != ROTATE_EVENT); + delete ev; + } + } + return fdev; +} + + +/* Open the given relay log SYNOPSIS @@ -518,6 +679,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log, } rli->group_relay_log_pos = rli->event_relay_log_pos = pos; + rli->clear_flag(Relay_log_info::IN_STMT); + rli->clear_flag(Relay_log_info::IN_TRANSACTION); /* Test to see if the previous run was with the skip of purging @@ -564,68 +727,13 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log, */ if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */ { - Log_event* ev; - while (look_for_description_event) + if (look_for_description_event) { - /* - Read the possible Format_description_log_event; if position - was 4, no need, it will be read naturally. - */ - DBUG_PRINT("info",("looking for a Format_description_log_event")); - - if (my_b_tell(rli->cur_log) >= pos) - break; - - /* - Because of we have rli->data_lock and log_lock, we can safely read an - event - */ - if (!(ev= Log_event::read_log_event(rli->cur_log, 0, - rli->relay_log.description_event_for_exec, - opt_slave_sql_verify_checksum))) - { - DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d", - rli->cur_log->error)); - if (rli->cur_log->error) /* not EOF */ - { - *errmsg= "I/O error reading event at position 4"; - goto err; - } - break; - } - else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) - { - DBUG_PRINT("info",("found Format_description_log_event")); - delete rli->relay_log.description_event_for_exec; - rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev; - /* - As ev was returned by read_log_event, it has passed is_valid(), so - my_malloc() in ctor worked, no need to check again. - */ - /* - Ok, we found a Format_description event. But it is not sure that this - describes the whole relay log; indeed, one can have this sequence - (starting from position 4): - Format_desc (of slave) - Rotate (of master) - Format_desc (of master) - So the Format_desc which really describes the rest of the relay log - is the 3rd event (it can't be further than that, because we rotate - the relay log when we queue a Rotate event from the master). - But what describes the Rotate is the first Format_desc. - So what we do is: - go on searching for Format_description events, until you exceed the - position (argument 'pos') or until you find another event than Rotate - or Format_desc. - */ - } - else - { - DBUG_PRINT("info",("found event of another type=%d", - ev->get_type_code())); - look_for_description_event= (ev->get_type_code() == ROTATE_EVENT); - delete ev; - } + Format_description_log_event *fdev; + if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg))) + goto err; + delete rli->relay_log.description_event_for_exec; + rli->relay_log.description_event_for_exec= fdev; } my_b_seek(rli->cur_log,(off_t)pos); #ifndef DBUG_OFF @@ -692,7 +800,7 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name, ulong init_abort_pos_wait; int error=0; struct timespec abstime; // for timeout checking - const char *msg; + PSI_stage_info old_stage; DBUG_ENTER("Relay_log_info::wait_for_pos"); if (!inited) @@ -703,9 +811,9 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name, set_timespec(abstime,timeout); mysql_mutex_lock(&data_lock); - msg= thd->enter_cond(&data_cond, &data_lock, - "Waiting for the slave SQL thread to " - "advance position"); + thd->ENTER_COND(&data_cond, &data_lock, + &stage_waiting_for_the_slave_thread_to_advance_position, + &old_stage); /* This function will abort when it notices that some CHANGE MASTER or RESET MASTER has changed the master info. @@ -730,7 +838,7 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name, ulong log_name_extension; char log_name_tmp[FN_REFLEN]; //make a char[] from String - strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1)); + strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1)); char *p= fn_ext(log_name_tmp); char *p_end; @@ -740,7 +848,7 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name, goto err; } // Convert 0-3 to 4 - log_pos= max(log_pos, BIN_LOG_HEADER_SIZE); + log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE); /* p points to '.' */ log_name_extension= strtoul(++p, &p_end, 10); /* @@ -849,7 +957,7 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name, } err: - thd->exit_cond(msg); + thd->EXIT_COND(&old_stage); DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \ improper_arguments: %d timed_out: %d", thd->killed_errno(), @@ -867,17 +975,65 @@ improper_arguments: %d timed_out: %d", void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, - bool skip_lock) + rpl_group_info *rgi, + bool skip_lock) { DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos"); if (!skip_lock) mysql_mutex_lock(&data_lock); - inc_event_relay_log_pos(); - group_relay_log_pos= event_relay_log_pos; - strmake_buf(group_relay_log_name,event_relay_log_name); + rgi->inc_event_relay_log_pos(); + DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", + (long) log_pos, (long) group_master_log_pos)); + if (rgi->is_parallel_exec) + { + /* In case of parallel replication, do not update the position backwards. */ + int cmp= strcmp(group_relay_log_name, rgi->event_relay_log_name); + if (cmp < 0) + { + group_relay_log_pos= rgi->future_event_relay_log_pos; + strmake_buf(group_relay_log_name, rgi->event_relay_log_name); + notify_group_relay_log_name_update(); + } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos) + group_relay_log_pos= rgi->future_event_relay_log_pos; + + /* + In the parallel case we need to update the master_log_name here, rather + than in Rotate_log_event::do_update_pos(). + */ + cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name); + if (cmp <= 0) + { + if (cmp < 0) + { + strcpy(group_master_log_name, rgi->future_event_master_log_name); + group_master_log_pos= log_pos; + } + else if (group_master_log_pos < log_pos) + group_master_log_pos= log_pos; + } - notify_group_relay_log_name_update(); + /* + In the parallel case, we only update the Seconds_Behind_Master at the + end of a transaction. In the non-parallel case, the value is updated as + soon as an event is read from the relay log; however this would be too + confusing for the user, seeing the slave reported as up-to-date when + potentially thousands of events are still queued up for worker threads + waiting for execution. + */ + if (rgi->last_master_timestamp && + rgi->last_master_timestamp > last_master_timestamp) + last_master_timestamp= rgi->last_master_timestamp; + } + else + { + /* Non-parallel case. */ + group_relay_log_pos= event_relay_log_pos; + strmake_buf(group_relay_log_name, event_relay_log_name); + notify_group_relay_log_name_update(); + if (log_pos) // 3.23 binlogs don't have log_posx + group_master_log_pos= log_pos; + } /* If the slave does not support transactions and replicates a transaction, @@ -909,12 +1065,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, the relay log is not "val". With the end_log_pos solution, we avoid computations involving lengthes. */ - DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", - (long) log_pos, (long) group_master_log_pos)); - if (log_pos) // 3.23 binlogs don't have log_posx - { - group_master_log_pos= log_pos; - } mysql_cond_broadcast(&data_cond); if (!skip_lock) mysql_mutex_unlock(&data_lock); @@ -930,6 +1080,9 @@ void Relay_log_info::close_temporary_tables() for (table=save_temporary_tables ; table ; table=next) { next=table->next; + + /* Reset in_use as the table may have been created by another thd */ + table->in_use=0; /* Don't ask for disk deletion. For now, anyway they will be deleted when slave restarts, but it is a better intention to not delete them. @@ -992,8 +1145,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, "log index file:%s.", rli->relay_log.get_index_fname()); DBUG_RETURN(1); } - if (rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0, - (max_relay_log_size ? max_relay_log_size : + if (rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, + (rli->max_relay_log_size ? rli->max_relay_log_size : max_binlog_size), 1, TRUE)) { sql_print_error("Unable to purge relay log files. Failed to open relay " @@ -1025,26 +1178,36 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, rli->cur_log_fd= -1; } - if (rli->relay_log.reset_logs(thd)) + if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0)) { *errmsg = "Failed during log reset"; error=1; goto err; } - /* Save name of used relay log file */ - strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); - strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); - rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; - if (count_relay_log_space(rli)) - { - *errmsg= "Error counting relay log space"; - error=1; - goto err; - } + rli->relay_log_state.load(rpl_global_gtid_slave_state); if (!just_reset) + { + /* Save name of used relay log file */ + strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); + strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); + rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; + rli->log_space_total= 0; + + if (count_relay_log_space(rli)) + { + *errmsg= "Error counting relay log space"; + error=1; + goto err; + } error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, 0 /* do not need data lock */, errmsg, 0); + } + else + { + /* Ensure relay log names are not used */ + rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0; + } if (!rli->inited && rli->error_on_rli_init_info) rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); @@ -1097,16 +1260,19 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) ulonglong log_pos; DBUG_ENTER("Relay_log_info::is_until_satisfied"); - DBUG_ASSERT(until_condition != UNTIL_NONE); + DBUG_ASSERT(until_condition == UNTIL_MASTER_POS || + until_condition == UNTIL_RELAY_POS); if (until_condition == UNTIL_MASTER_POS) { - if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id) + if (ev && ev->server_id == (uint32) global_system_variables.server_id && + !replicate_same_server_id) DBUG_RETURN(FALSE); - log_name= group_master_log_name; - log_pos= (!ev)? group_master_log_pos : - ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ? - group_master_log_pos : ev->log_pos - ev->data_written); + log_name= (mi->using_parallel() ? + future_event_master_log_name : group_master_log_name); + log_pos= ((!ev)? group_master_log_pos : + (get_flag(IN_TRANSACTION) || !ev->log_pos) ? + group_master_log_pos : ev->log_pos - ev->data_written); } else { /* until_condition == UNTIL_RELAY_POS */ @@ -1175,43 +1341,22 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) } -void Relay_log_info::cached_charset_invalidate() -{ - DBUG_ENTER("Relay_log_info::cached_charset_invalidate"); - - /* Full of zeroes means uninitialized. */ - bzero(cached_charset, sizeof(cached_charset)); - DBUG_VOID_RETURN; -} - - -bool Relay_log_info::cached_charset_compare(char *charset) const -{ - DBUG_ENTER("Relay_log_info::cached_charset_compare"); - - if (memcmp(cached_charset, charset, sizeof(cached_charset))) - { - memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset)); - DBUG_RETURN(1); - } - DBUG_RETURN(0); -} - - -void Relay_log_info::stmt_done(my_off_t event_master_log_pos, - time_t event_creation_time) +bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, + rpl_group_info *rgi) { -#ifndef DBUG_OFF - extern uint debug_not_change_ts_if_art_event; -#endif - clear_flag(IN_STMT); + int error= 0; + DBUG_ENTER("Relay_log_info::stmt_done"); + DBUG_ASSERT(rgi->rli == this); /* If in a transaction, and if the slave supports transactions, just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with BEGIN/COMMIT, not with SET AUTOCOMMIT= . + We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN + is also used for single row transactions. + CAUTION: opt_using_transactions means innodb || bdb ; suppose the master supports InnoDB and BDB, but the slave supports only BDB, problems will arise: - suppose an InnoDB table is created on the @@ -1229,32 +1374,416 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, middle of the "transaction". START SLAVE will resume at BEGIN while the MyISAM table has already been updated. */ - if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) - inc_event_relay_log_pos(); + if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && + opt_using_transactions) + rgi->inc_event_relay_log_pos(); else { - inc_group_relay_log_pos(event_master_log_pos); - flush_relay_log_info(this); - /* - Note that Rotate_log_event::do_apply_event() does not call this - function, so there is no chance that a fake rotate event resets - last_master_timestamp. Note that we update without mutex - (probably ok - except in some very rare cases, only consequence - is that value may take some time to display in - Seconds_Behind_Master - not critical). - */ - if (!(event_creation_time == 0 && - IF_DBUG(debug_not_change_ts_if_art_event > 0, 1))) - last_master_timestamp= event_creation_time; + inc_group_relay_log_pos(event_master_log_pos, rgi); + if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi)) + { + report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(), + "Failed to update GTID state in %s.%s, slave state may become " + "inconsistent: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, + thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message()); + /* + At this point we are not in a transaction (for example after DDL), + so we can not roll back. Anyway, normally updates to the slave + state table should not fail, and if they do, at least we made the + DBA aware of the problem in the error log. + */ + } + DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); + if (mi->using_gtid == Master_info::USE_GTID_NO) + if (flush_relay_log_info(this)) + error= 1; + DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); + } + DBUG_RETURN(error); +} + + +int +Relay_log_info::alloc_inuse_relaylog(const char *name) +{ + inuse_relaylog *ir; + uint32 gtid_count; + rpl_gtid *gtid_list; + + if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); + return 1; + } + gtid_count= relay_log_state.count(); + if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count, + MYF(MY_WME)))) + { + my_free(ir); + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count); + return 1; + } + if (relay_log_state.get_gtid_list(gtid_list, gtid_count)) + { + my_free(gtid_list); + my_free(ir); + DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + ir->rli= this; + strmake_buf(ir->name, name); + ir->relay_log_state= gtid_list; + ir->relay_log_state_count= gtid_count; + + if (!inuse_relaylog_list) + inuse_relaylog_list= ir; + else + { + last_inuse_relaylog->completed= true; + last_inuse_relaylog->next= ir; + } + last_inuse_relaylog= ir; + my_atomic_rwlock_init(&ir->inuse_relaylog_atomic_lock); + + return 0; +} + + +void +Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir) +{ + my_free(ir->relay_log_state); + my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock); + my_free(ir); +} + + +void +Relay_log_info::reset_inuse_relaylog() +{ + inuse_relaylog *cur= inuse_relaylog_list; + while (cur) + { + DBUG_ASSERT(cur->queued_count == cur->dequeued_count); + inuse_relaylog *next= cur->next; + free_inuse_relaylog(cur); + cur= next; + } + inuse_relaylog_list= last_inuse_relaylog= NULL; +} + + +int +Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count) +{ + int res= 0; + while (count) + { + if (relay_log_state.update_nolock(gtid_list, false)) + res= 1; + ++gtid_list; + --count; } + return res; } + #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -void Relay_log_info::cleanup_context(THD *thd, bool error) +int +rpl_load_gtid_slave_state(THD *thd) { - DBUG_ENTER("Relay_log_info::cleanup_context"); + TABLE_LIST tlist; + TABLE *table; + bool table_opened= false; + bool table_scanned= false; + bool array_inited= false; + struct local_element { uint64 sub_id; rpl_gtid gtid; }; + struct local_element tmp_entry, *entry; + HASH hash; + DYNAMIC_ARRAY array; + int err= 0; + uint32 i; + DBUG_ENTER("rpl_load_gtid_slave_state"); + + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + bool loaded= rpl_global_gtid_slave_state->loaded; + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (loaded) + DBUG_RETURN(0); + + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); + if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0)))) + goto end; + array_inited= true; + + mysql_reset_thd_for_next_command(thd); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_READ); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + + bitmap_set_all(table->read_set); + if ((err= table->file->ha_rnd_init_with_error(1))) + { + table->file->print_error(err, MYF(0)); + goto end; + } + table_scanned= true; + for (;;) + { + uint32 domain_id, server_id; + uint64 sub_id, seq_no; + uchar *rec; + + if ((err= table->file->ha_rnd_next(table->record[0]))) + { + if (err == HA_ERR_RECORD_DELETED) + continue; + else if (err == HA_ERR_END_OF_FILE) + break; + else + { + table->file->print_error(err, MYF(0)); + goto end; + } + } + domain_id= (ulonglong)table->field[0]->val_int(); + sub_id= (ulonglong)table->field[1]->val_int(); + server_id= (ulonglong)table->field[2]->val_int(); + seq_no= (ulonglong)table->field[3]->val_int(); + DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n", + (unsigned)domain_id, (unsigned)server_id, + (ulong)seq_no, (ulong)sub_id)); + + tmp_entry.sub_id= sub_id; + tmp_entry.gtid.domain_id= domain_id; + tmp_entry.gtid.server_id= server_id; + tmp_entry.gtid.seq_no= seq_no; + if ((err= insert_dynamic(&array, (uchar *)&tmp_entry))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } + + if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0))) + { + entry= (struct local_element *)rec; + if (entry->sub_id >= sub_id) + continue; + entry->sub_id= sub_id; + DBUG_ASSERT(entry->gtid.domain_id == domain_id); + entry->gtid.server_id= server_id; + entry->gtid.seq_no= seq_no; + } + else + { + if (!(entry= (struct local_element *)my_malloc(sizeof(*entry), + MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry)); + err= 1; + goto end; + } + entry->sub_id= sub_id; + entry->gtid.domain_id= domain_id; + entry->gtid.server_id= server_id; + entry->gtid.seq_no= seq_no; + if ((err= my_hash_insert(&hash, (uchar *)entry))) + { + my_free(entry); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } + } + } - DBUG_ASSERT(sql_thd == thd); + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (rpl_global_gtid_slave_state->loaded) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + goto end; + } + + for (i= 0; i < array.elements; ++i) + { + get_dynamic(&array, (uchar *)&tmp_entry, i); + if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id, + tmp_entry.gtid.server_id, + tmp_entry.sub_id, + tmp_entry.gtid.seq_no, + NULL))) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } + } + + for (i= 0; i < hash.records; ++i) + { + entry= (struct local_element *)my_hash_element(&hash, i); + if (opt_bin_log && + mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id, + entry->gtid.seq_no)) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } + } + + rpl_global_gtid_slave_state->loaded= true; + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + + err= 0; /* Clear HA_ERR_END_OF_FILE */ + +end: + if (table_scanned) + { + table->file->ha_index_or_rnd_end(); + ha_commit_trans(thd, FALSE); + ha_commit_trans(thd, TRUE); + } + if (table_opened) + { + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + } + if (array_inited) + delete_dynamic(&array); + my_hash_free(&hash); + DBUG_RETURN(err); +} + + +void +rpl_group_info::reinit(Relay_log_info *rli) +{ + this->rli= rli; + tables_to_lock= NULL; + tables_to_lock_count= 0; + trans_retries= 0; + last_event_start_time= 0; + gtid_sub_id= 0; + commit_id= 0; + gtid_pending= false; + worker_error= 0; + row_stmt_start_timestamp= 0; + long_find_row_note_printed= false; + did_mark_start_commit= false; + last_master_timestamp = 0; + gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; + commit_orderer.reinit(); +} + +rpl_group_info::rpl_group_info(Relay_log_info *rli) + : thd(0), wait_commit_sub_id(0), + wait_commit_group_info(0), parallel_entry(0), + deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) +{ + reinit(rli); + bzero(¤t_gtid, sizeof(current_gtid)); + mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL); +} + + +rpl_group_info::~rpl_group_info() +{ + free_annotate_event(); + delete deferred_events; + mysql_mutex_destroy(&sleep_lock); + mysql_cond_destroy(&sleep_cond); +} + + +int +event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) +{ + uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id); + if (!sub_id) + { + /* Out of memory caused hash insertion to fail. */ + return 1; + } + rgi->gtid_sub_id= sub_id; + rgi->current_gtid.domain_id= gev->domain_id; + rgi->current_gtid.server_id= gev->server_id; + rgi->current_gtid.seq_no= gev->seq_no; + rgi->commit_id= gev->commit_id; + rgi->gtid_pending= true; + return 0; +} + + +void +delete_or_keep_event_post_apply(rpl_group_info *rgi, + Log_event_type typ, Log_event *ev) +{ + /* + ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be + thread-safe for parallel replication. + */ + + switch (typ) { + case FORMAT_DESCRIPTION_EVENT: + /* + Format_description_log_event should not be deleted because it + will be used to read info about the relay log's format; + it will be deleted when the SQL thread does not need it, + i.e. when this thread terminates. + */ + break; + case ANNOTATE_ROWS_EVENT: + /* + Annotate_rows event should not be deleted because after it has + been applied, thd->query points to the string inside this event. + The thd->query will be used to generate new Annotate_rows event + during applying the subsequent Rows events. + */ + rgi->set_annotate_event((Annotate_rows_log_event*) ev); + break; + case DELETE_ROWS_EVENT_V1: + case UPDATE_ROWS_EVENT_V1: + case WRITE_ROWS_EVENT_V1: + case DELETE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case WRITE_ROWS_EVENT: + /* + After the last Rows event has been applied, the saved Annotate_rows + event (if any) is not needed anymore and can be deleted. + */ + if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) + rgi->free_annotate_event(); + /* fall through */ + default: + DBUG_PRINT("info", ("Deleting the event after it has been executed")); + if (!rgi->is_deferred_event(ev)) + delete ev; + break; + } +} + + +void rpl_group_info::cleanup_context(THD *thd, bool error) +{ + DBUG_ENTER("rpl_group_info::cleanup_context"); + DBUG_PRINT("enter", ("error: %d", (int) error)); + + DBUG_ASSERT(this->thd == thd); /* 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, may have opened tables, which we cannot be sure have been closed (because @@ -1270,13 +1799,38 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) if (error) { trans_rollback_stmt(thd); // if a "statement transaction" + /* trans_rollback() also resets OPTION_GTID_BEGIN */ trans_rollback(thd); // if a "real transaction" + /* + Now that we have rolled back the transaction, make sure we do not + erroneously update the GTID position. + */ + gtid_pending= false; } m_table_map.clear_tables(); slave_close_thread_tables(thd); if (error) + { thd->mdl_context.release_transactional_locks(); - clear_flag(IN_STMT); + + if (thd == rli->sql_driver_thd) + { + /* + Reset flags. This is needed to handle incident events and errors in + the relay log noticed by the sql driver thread. + */ + rli->clear_flag(Relay_log_info::IN_STMT); + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + } + + /* + Ensure we always release the domain for others to process, when using + --gtid-ignore-duplicates. + */ + if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL) + rpl_global_gtid_slave_state->release_domain_owner(this); + } + /* Cleanup for the flags that have been set at do_apply_event. */ @@ -1291,12 +1845,18 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) reset_row_stmt_start_timestamp(); unset_long_find_row_note_printed(); + DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x", { + if (current_gtid.domain_id == 100) + my_sleep(50000); + };); + DBUG_VOID_RETURN; } -void Relay_log_info::clear_tables_to_lock() + +void rpl_group_info::clear_tables_to_lock() { - DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); + DBUG_ENTER("rpl_group_info::clear_tables_to_lock()"); #ifndef DBUG_OFF /** When replicating in RBR and MyISAM Merge tables are involved @@ -1340,12 +1900,13 @@ void Relay_log_info::clear_tables_to_lock() DBUG_VOID_RETURN; } -void Relay_log_info::slave_close_thread_tables(THD *thd) + +void rpl_group_info::slave_close_thread_tables(THD *thd) { - DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); - thd->stmt_da->can_overwrite_status= TRUE; + DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)"); + thd->get_stmt_da()->set_overwrite_status(true); thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); - thd->stmt_da->can_overwrite_status= FALSE; + thd->get_stmt_da()->set_overwrite_status(false); close_thread_tables(thd); /* @@ -1373,4 +1934,128 @@ void Relay_log_info::slave_close_thread_tables(THD *thd) clear_tables_to_lock(); DBUG_VOID_RETURN; } + + + +static void +mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco, + rpl_group_info *rgi) +{ + group_commit_orderer *tmp; + uint64 count= ++e->count_committing_event_groups; + /* Signal any following GCO whose wait_count has been reached now. */ + tmp= gco; + while ((tmp= tmp->next_gco)) + { + uint64 wait_count= tmp->wait_count; + if (wait_count > count) + break; + mysql_cond_broadcast(&tmp->COND_group_commit_orderer); + } +} + + +void +rpl_group_info::mark_start_commit_no_lock() +{ + if (did_mark_start_commit) + return; + did_mark_start_commit= true; + mark_start_commit_inner(parallel_entry, gco, this); +} + + +void +rpl_group_info::mark_start_commit() +{ + rpl_parallel_entry *e; + + if (did_mark_start_commit) + return; + did_mark_start_commit= true; + + e= this->parallel_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + mark_start_commit_inner(e, gco, this); + mysql_mutex_unlock(&e->LOCK_parallel_entry); +} + + +/* + Format the current GTID as a string suitable for printing in error messages. + + The string is stored in a buffer inside rpl_group_info, so remains valid + until next call to gtid_info() or until destruction of rpl_group_info. + + If no GTID is available, then NULL is returned. +*/ +char * +rpl_group_info::gtid_info() +{ + if (!gtid_sub_id || !current_gtid.seq_no) + return NULL; + my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu", + current_gtid.domain_id, current_gtid.server_id, + current_gtid.seq_no); + return gtid_info_buf; +} + + +/* + Undo the effect of a prior mark_start_commit(). + + This is only used for retrying a transaction in parallel replication, after + we have encountered a deadlock or other temporary error. + + When we get such a deadlock, it means that the current group of transactions + did not yet all start committing (else they would not have deadlocked). So + we will not yet have woken up anything in the next group, our rgi->gco is + still live, and we can simply decrement the counter (to be incremented again + later, when the retry succeeds and reaches the commit step). +*/ +void +rpl_group_info::unmark_start_commit() +{ + rpl_parallel_entry *e; + + if (!did_mark_start_commit) + return; + did_mark_start_commit= false; + + e= this->parallel_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + --e->count_committing_event_groups; + mysql_mutex_unlock(&e->LOCK_parallel_entry); +} + + +rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter) + : rpl_filter(filter) +{ + cached_charset_invalidate(); +} + + +void rpl_sql_thread_info::cached_charset_invalidate() +{ + DBUG_ENTER("rpl_group_info::cached_charset_invalidate"); + + /* Full of zeroes means uninitialized. */ + bzero(cached_charset, sizeof(cached_charset)); + DBUG_VOID_RETURN; +} + + +bool rpl_sql_thread_info::cached_charset_compare(char *charset) const +{ + DBUG_ENTER("rpl_group_info::cached_charset_compare"); + + if (memcmp(cached_charset, charset, sizeof(cached_charset))) + { + memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset)); + DBUG_RETURN(1); + } + DBUG_RETURN(0); +} + #endif |