diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 2424 |
1 files changed, 1901 insertions, 523 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index a03deb1e793..30d55f8bc2a 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1,5 +1,5 @@ /* Copyright (c) 2000, 2015, Oracle and/or its affiliates. - Copyright (c) 2008, 2015, SkySQL Ab. + Copyright (c) 2008, 2015, MariaDB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,8 +25,8 @@ replication slave. */ +#include <my_global.h> #include "sql_priv.h" -#include "my_global.h" #include "slave.h" #include "sql_parse.h" // execute_init_command #include "sql_table.h" // mysql_rm_table @@ -57,6 +57,8 @@ #include "rpl_tblmap.h" #include "debug_sync.h" +#include "rpl_parallel.h" + #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -71,8 +73,10 @@ char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; char* slave_load_tmpdir = 0; Master_info *active_mi= 0; +Master_info_index *master_info_index; my_bool replicate_same_server_id; ulonglong relay_log_space_limit = 0; +LEX_STRING default_master_connection_name= { (char*) "", 0 }; /* When slave thread exits, we need to remember the temporary tables so we @@ -112,7 +116,7 @@ static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]= registration on master", "Reconnecting after a failed registration on master", "failed registering on master, reconnecting to try again, \ -log '%s' at position %s", +log '%s' at position %s%s", "COM_REGISTER_SLAVE", "Slave I/O thread killed during or after reconnect" }, @@ -120,7 +124,7 @@ log '%s' at position %s", "Waiting to reconnect after a failed binlog dump request", "Slave I/O thread killed while retrying master dump", "Reconnecting after a failed binlog dump request", - "failed dump request, reconnecting to try again, log '%s' at position %s", + "failed dump request, reconnecting to try again, log '%s' at position %s%s", "COM_BINLOG_DUMP", "Slave I/O thread killed during or after reconnect" }, @@ -129,7 +133,7 @@ log '%s' at position %s", "Slave I/O thread killed while waiting to reconnect after a failed read", "Reconnecting after a failed master event read", "Slave I/O thread: Failed reading log event, reconnecting to retry, \ -log '%s' at position %s", +log '%s' at position %s%s", "", "Slave I/O thread killed during or after a reconnect done to recover from \ failed read" @@ -142,23 +146,20 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; static int process_io_rotate(Master_info* mi, Rotate_log_event* rev); static int process_io_create_file(Master_info* mi, Create_file_log_event* cev); static bool wait_for_relay_log_space(Relay_log_info* rli); -static inline bool io_slave_killed(THD* thd,Master_info* mi); -static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli); -static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); +static bool io_slave_killed(Master_info* mi); +static bool sql_slave_killed(rpl_group_info *rgi); +static int init_slave_thread(THD*, Master_info *, SLAVE_THD_TYPE); static void print_slave_skip_errors(void); static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); -static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, - bool suppress_warnings); -static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, - bool reconnect, bool suppress_warnings); -static Log_event* next_event(Relay_log_info* rli); +static int safe_reconnect(THD*, MYSQL*, Master_info*, bool); +static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool); +static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size); static int queue_event(Master_info* mi,const char* buf,ulong event_len); -static int terminate_slave_thread(THD *thd, - mysql_mutex_t *term_lock, - mysql_cond_t *term_cond, - volatile uint *slave_running, - bool skip_lock); -static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info); +static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *, + volatile uint *, bool); +static bool check_io_slave_killed(Master_info *mi, const char *info); +static bool send_show_master_info_header(THD *, bool, size_t); +static bool send_show_master_info_data(THD *, Master_info *, bool, String *); /* Function to set the slave's max_allowed_packet based on the value of slave_max_allowed_packet. @@ -277,6 +278,77 @@ static void init_slave_psi_keys(void) } #endif /* HAVE_PSI_INTERFACE */ + +static bool slave_init_thread_running; + + +pthread_handler_t +handle_slave_init(void *arg __attribute__((unused))) +{ + THD *thd; + + my_thread_init(); + thd= new THD; + thd->thread_stack= (char*) &thd; /* Set approximate stack start */ + mysql_mutex_lock(&LOCK_thread_count); + thd->thread_id= thread_id++; + mysql_mutex_unlock(&LOCK_thread_count); + thd->system_thread = SYSTEM_THREAD_SLAVE_INIT; + thd->store_globals(); + thd->security_ctx->skip_grants(); + thd->set_command(COM_DAEMON); + + thd_proc_info(thd, "Loading slave GTID position from table"); + if (rpl_load_gtid_slave_state(thd)) + sql_print_warning("Failed to load slave replication state from table " + "%s.%s: %u: %s", "mysql", + rpl_gtid_slave_state_table_name.str, + thd->get_stmt_da()->sql_errno(), + thd->get_stmt_da()->message()); + + mysql_mutex_lock(&LOCK_thread_count); + delete thd; + mysql_mutex_unlock(&LOCK_thread_count); + my_thread_end(); + + mysql_mutex_lock(&LOCK_slave_init); + slave_init_thread_running= false; + mysql_cond_broadcast(&COND_slave_init); + mysql_mutex_unlock(&LOCK_slave_init); + + return 0; +} + + +/* + Start the slave init thread. + + This thread is used to load the GTID state from mysql.gtid_slave_pos at + server start; reading from table requires valid THD, which is otherwise not + available during server init. +*/ +static int +run_slave_init_thread() +{ + pthread_t th; + + slave_init_thread_running= true; + if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib, + handle_slave_init, NULL)) + { + sql_print_error("Failed to create thread while initialising slave"); + return 1; + } + + mysql_mutex_lock(&LOCK_slave_init); + while (slave_init_thread_running) + mysql_cond_wait(&COND_slave_init, &LOCK_slave_init); + mysql_mutex_unlock(&LOCK_slave_init); + + return 0; +} + + /* Initialize slave structures */ int init_slave() @@ -288,21 +360,45 @@ int init_slave() init_slave_psi_keys(); #endif + if (run_slave_init_thread()) + return 1; + + if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) + return 1; + /* This is called when mysqld starts. Before client connections are accepted. However bootstrap may conflict with us if it does START SLAVE. So it's safer to take the lock. */ mysql_mutex_lock(&LOCK_active_mi); - /* - TODO: re-write this to interate through the list of files - for multi-master - */ - active_mi= new Master_info(relay_log_recovery); if (pthread_key_create(&RPL_MASTER_INFO, NULL)) goto err; + master_info_index= new Master_info_index; + if (!master_info_index || master_info_index->init_all_master_info()) + { + sql_print_error("Failed to initialize multi master structures"); + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(1); + } + if (!(active_mi= new Master_info(&default_master_connection_name, + relay_log_recovery)) || + active_mi->error()) + { + delete active_mi; + active_mi= 0; + goto err; + } + + if (master_info_index->add_master_info(active_mi, FALSE)) + { + delete active_mi; + active_mi= 0; + goto err; + } + /* If --slave-skip-errors=... was not used, the string value for the system variable has not been set up yet. Do it now. @@ -317,18 +413,11 @@ int init_slave() If master_host is specified, create the master_info file if it doesn't exists. */ - if (!active_mi) - { - sql_print_error("Failed to allocate memory for the master info structure"); - error= 1; - goto err; - } if (init_master_info(active_mi,master_info_file,relay_log_info_file, 1, (SLAVE_IO | SLAVE_SQL))) { sql_print_error("Failed to initialize the master info structure"); - error= 1; goto err; } @@ -344,14 +433,18 @@ int init_slave() SLAVE_IO | SLAVE_SQL)) { sql_print_error("Failed to create slave threads"); - error= 1; goto err; } } -err: +end: mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(error); + +err: + sql_print_error("Failed to allocate memory for the Master Info structure"); + error= 1; + goto end; } /* @@ -388,7 +481,7 @@ int init_recovery(Master_info* mi, const char** errmsg) Relay_log_info *rli= &mi->rli; if (rli->group_master_log_name[0]) { - mi->master_log_pos= max(BIN_LOG_HEADER_SIZE, + mi->master_log_pos= MY_MAX(BIN_LOG_HEADER_SIZE, rli->group_master_log_pos); strmake_buf(mi->master_log_name, rli->group_master_log_name); @@ -403,6 +496,7 @@ int init_recovery(Master_info* mi, const char** errmsg) DBUG_RETURN(0); } + /** Convert slave skip errors bitmap into a printable string. @@ -481,7 +575,7 @@ void init_slave_skip_errors(const char* arg) const char *p; DBUG_ENTER("init_slave_skip_errors"); - if (bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0)) + if (my_bitmap_init(&slave_error_mask,0,MAX_SLAVE_ERROR,0)) { fprintf(stderr, "Badly out of memory, please check your system status\n"); exit(1); @@ -510,14 +604,6 @@ void init_slave_skip_errors(const char* arg) DBUG_VOID_RETURN; } -static void set_thd_in_use_temporary_tables(Relay_log_info *rli) -{ - TABLE *table; - - for (table= rli->save_temporary_tables ; table ; table= table->next) - table->in_use= rli->sql_thd; -} - int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) { DBUG_ENTER("terminate_slave_threads"); @@ -531,8 +617,15 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) { DBUG_PRINT("info",("Terminating SQL thread")); - mi->rli.abort_slave=1; - if ((error=terminate_slave_thread(mi->rli.sql_thd, sql_lock, + if (opt_slave_parallel_threads > 0 && + mi->rli.abort_slave && mi->rli.stop_for_until) + { + mi->rli.stop_for_until= false; + mi->rli.parallel.stop_during_until(); + } + else + mi->rli.abort_slave=1; + if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock, &mi->rli.stop_cond, &mi->rli.slave_running, skip_lock)) && @@ -543,7 +636,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) DBUG_PRINT("info",("Flushing relay-log info file.")); if (current_thd) - thd_proc_info(current_thd, "Flushing relay-log info file."); + THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file); if (flush_relay_log_info(&mi->rli)) DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); @@ -551,6 +644,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); mysql_mutex_unlock(log_lock); + + if (opt_slave_parallel_threads > 0 && + !master_info_index->any_slave_sql_running()) + rpl_parallel_inactivate_pool(&global_rpl_thread_pool); } if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) { @@ -567,7 +664,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) DBUG_PRINT("info",("Flushing relay log and master info file.")); if (current_thd) - thd_proc_info(current_thd, "Flushing relay log and master info files."); + THD_STAGE_INFO(current_thd, stage_flushing_relay_log_and_master_info_repository); if (flush_master_info(mi, TRUE, FALSE)) DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); @@ -715,7 +812,7 @@ int start_slave_thread( if (start_lock) mysql_mutex_lock(start_lock); - if (!server_id) + if (!global_system_variables.server_id) { if (start_cond) mysql_cond_broadcast(start_cond); @@ -749,8 +846,10 @@ int start_slave_thread( while (start_id == *slave_run_id) { DBUG_PRINT("sleep",("Waiting for slave thread to start")); - const char *old_msg= thd->enter_cond(start_cond, cond_lock, - "Waiting for slave thread to start"); + PSI_stage_info saved_stage= {0, "", 0}; + thd->ENTER_COND(start_cond, cond_lock, + & stage_waiting_for_slave_thread_to_start, + & saved_stage); /* It is not sufficient to test this at loop bottom. We must test it after registering the mutex in enter_cond(). If the kill @@ -760,7 +859,7 @@ int start_slave_thread( */ if (!thd->killed) mysql_cond_wait(start_cond, cond_lock); - thd->exit_cond(old_msg); + thd->EXIT_COND(& saved_stage); mysql_mutex_lock(cond_lock); // re-acquire it as exit_cond() released if (thd->killed) { @@ -792,6 +891,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0; mysql_cond_t* cond_io=0, *cond_sql=0; int error=0; + const char *errmsg; DBUG_ENTER("start_slave_threads"); if (need_slave_mutex) @@ -807,7 +907,43 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, lock_cond_sql = &mi->rli.run_lock; } - if (thread_mask & SLAVE_IO) + /* + If we are using GTID and both SQL and IO threads are stopped, then get + rid of all relay logs. + + Relay logs are not very useful when using GTID, except as a buffer + between the fetch in the IO thread and the apply in SQL thread. However + while one of the threads is running, they are in use and cannot be + removed. + */ + if (mi->using_gtid != Master_info::USE_GTID_NO && + !mi->slave_running && !mi->rli.slave_running) + { + /* + purge_relay_logs() clears the mi->rli.group_master_log_pos. + So save and restore them, like we do in CHANGE MASTER. + (We are not going to use them for GTID, but it might be worth to + keep them in case connection with GTID fails and user wants to go + back and continue with previous old-style replication coordinates). + */ + mi->master_log_pos = MY_MAX(BIN_LOG_HEADER_SIZE, + mi->rli.group_master_log_pos); + strmake(mi->master_log_name, mi->rli.group_master_log_name, + sizeof(mi->master_log_name)-1); + purge_relay_logs(&mi->rli, NULL, 0, &errmsg); + mi->rli.group_master_log_pos= mi->master_log_pos; + strmake(mi->rli.group_master_log_name, mi->master_log_name, + sizeof(mi->rli.group_master_log_name)-1); + + error= rpl_load_gtid_state(&mi->gtid_current_pos, mi->using_gtid == + Master_info::USE_GTID_CURRENT_POS); + mi->events_queued_since_last_gtid= 0; + mi->gtid_reconnect_event_skip_count= 0; + + mi->rli.restart_gtid_pos.reset(); + } + + if (!error && (thread_mask & SLAVE_IO)) error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE key_thread_slave_io, @@ -818,7 +954,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, mi); if (!error && (thread_mask & SLAVE_SQL)) { - error= start_slave_thread( + if (opt_slave_parallel_threads > 0) + error= rpl_parallel_activate_pool(&global_rpl_thread_pool); + if (!error) + error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE key_thread_slave_sql, #endif @@ -850,49 +989,27 @@ void end_slave() running presently. If a START SLAVE was in progress, the mutex lock below will make us wait until slave threads have started, and START SLAVE returns, then we terminate them here. + + We can also be called by cleanup(), which only happens if some + startup parameter to the server was wrong. */ mysql_mutex_lock(&LOCK_active_mi); - if (active_mi) - { - /* - TODO: replace the line below with - list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); - once multi-master code is ready. - */ - terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); - } + /* This will call terminate_slave_threads() on all connections */ + delete master_info_index; + master_info_index= 0; + active_mi= 0; mysql_mutex_unlock(&LOCK_active_mi); + global_rpl_thread_pool.destroy(); + free_all_rpl_filters(); DBUG_VOID_RETURN; } -/** - Free all resources used by slave threads at time of executing shutdown. - The routine must be called after all possible users of @c active_mi - have left. - - SYNOPSIS - close_active_mi() - -*/ -void close_active_mi() -{ - mysql_mutex_lock(&LOCK_active_mi); - if (active_mi) - { - end_master_info(active_mi); - delete active_mi; - active_mi= 0; - } - mysql_mutex_unlock(&LOCK_active_mi); -} - -static bool io_slave_killed(THD* thd, Master_info* mi) +static bool io_slave_killed(Master_info* mi) { DBUG_ENTER("io_slave_killed"); - DBUG_ASSERT(mi->io_thd == thd); DBUG_ASSERT(mi->slave_running); // tracking buffer overrun - DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed); + DBUG_RETURN(mi->abort_slave || abort_loop || mi->io_thd->killed); } /** @@ -908,26 +1025,36 @@ static bool io_slave_killed(THD* thd, Master_info* mi) @return TRUE the killed status is recognized, FALSE a possible killed status is deferred. */ -static bool sql_slave_killed(THD* thd, Relay_log_info* rli) +static bool sql_slave_killed(rpl_group_info *rgi) { bool ret= FALSE; + Relay_log_info *rli= rgi->rli; + THD *thd= rgi->thd; DBUG_ENTER("sql_slave_killed"); - DBUG_ASSERT(rli->sql_thd == thd); + DBUG_ASSERT(rli->sql_driver_thd == thd); DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun - if (abort_loop || thd->killed || rli->abort_slave) + if (abort_loop || rli->sql_driver_thd->killed || rli->abort_slave) { /* - The transaction should always be binlogged if OPTION_KEEP_LOG is set - (it implies that something can not be rolled back). And such case - should be regarded similarly as modifing a non-transactional table - because retrying of the transaction will lead to an error or inconsistency - as well. - Example: OPTION_KEEP_LOG is set if a temporary table is created or dropped. + The transaction should always be binlogged if OPTION_KEEP_LOG is + set (it implies that something can not be rolled back). And such + case should be regarded similarly as modifing a + non-transactional table because retrying of the transaction will + lead to an error or inconsistency as well. + + Example: OPTION_KEEP_LOG is set if a temporary table is created + or dropped. + + Note that transaction.all.modified_non_trans_table may be 1 + if last statement was a single row transaction without begin/end. + Testing this flag must always be done in connection with + rli->is_in_group(). */ + if ((thd->transaction.all.modified_non_trans_table || - (thd->variables.option_bits & OPTION_KEEP_LOG)) - && rli->is_in_group()) + (thd->variables.option_bits & OPTION_KEEP_LOG)) && + rli->is_in_group()) { char msg_stopped[]= "... Slave SQL Thread stopped with incomplete event group " @@ -937,25 +1064,34 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) "ignores duplicate key, key not found, and similar errors (see " "documentation for details)."; + DBUG_PRINT("info", ("modified_non_trans_table: %d OPTION_BEGIN: %d " + "OPTION_KEEP_LOG: %d is_in_group: %d", + thd->transaction.all.modified_non_trans_table, + MY_TEST(thd->variables.option_bits & OPTION_BEGIN), + MY_TEST(thd->variables.option_bits & OPTION_KEEP_LOG), + rli->is_in_group())); + if (rli->abort_slave) { - DBUG_PRINT("info", ("Request to stop slave SQL Thread received while " - "applying a group that has non-transactional " - "changes; waiting for completion of the group ... ")); + DBUG_PRINT("info", + ("Request to stop slave SQL Thread received while " + "applying a group that has non-transactional " + "changes; waiting for completion of the group ... ")); /* - Slave sql thread shutdown in face of unfinished group modified - Non-trans table is handled via a timer. The slave may eventually - give out to complete the current group and in that case there - might be issues at consequent slave restart, see the error message. - WL#2975 offers a robust solution requiring to store the last exectuted - event's coordinates along with the group's coordianates - instead of waiting with @c last_event_start_time the timer. + Slave sql thread shutdown in face of unfinished group + modified Non-trans table is handled via a timer. The slave + may eventually give out to complete the current group and in + that case there might be issues at consequent slave restart, + see the error message. WL#2975 offers a robust solution + requiring to store the last exectuted event's coordinates + along with the group's coordianates instead of waiting with + @c last_event_start_time the timer. */ - if (rli->last_event_start_time == 0) - rli->last_event_start_time= my_time(0); - ret= difftime(my_time(0), rli->last_event_start_time) <= + if (rgi->last_event_start_time == 0) + rgi->last_event_start_time= my_time(0); + ret= difftime(my_time(0), rgi->last_event_start_time) <= SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE; DBUG_EXECUTE_IF("stop_slave_middle_group", @@ -964,21 +1100,22 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) if (ret == 0) { - rli->report(WARNING_LEVEL, 0, + rli->report(WARNING_LEVEL, 0, rgi->gtid_info(), "Request to stop slave SQL Thread received while " "applying a group that has non-transactional " "changes; waiting for completion of the group ... "); } else { - rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(), ER(ER_SLAVE_FATAL_ERROR), msg_stopped); } } else { ret= TRUE; - rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR), + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(), + ER(ER_SLAVE_FATAL_ERROR), msg_stopped); } } @@ -988,7 +1125,7 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) } } if (ret) - rli->last_event_start_time= 0; + rgi->last_event_start_time= 0; DBUG_RETURN(ret); } @@ -1242,7 +1379,7 @@ bool is_network_error(uint errorno) static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) { - char err_buff[MAX_SLAVE_ERRMSG]; + char err_buff[MAX_SLAVE_ERRMSG], err_buff2[MAX_SLAVE_ERRMSG]; const char* errmsg= 0; int err_code= 0; MYSQL_RES *master_res= 0; @@ -1259,23 +1396,28 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) if (!my_isdigit(&my_charset_bin,*mysql->server_version)) { - errmsg = "Master reported unrecognized MySQL version"; + errmsg= err_buff2; + snprintf(err_buff2, sizeof(err_buff2), + "Master reported unrecognized MySQL version: %s", + mysql->server_version); err_code= ER_SLAVE_FATAL_ERROR; - sprintf(err_buff, ER(err_code), errmsg); + sprintf(err_buff, ER(err_code), err_buff2); } else { /* Note the following switch will bug when we have MySQL branch 30 ;) */ - switch (version) - { + switch (version) { case 0: case 1: case 2: - errmsg = "Master reported unrecognized MySQL version"; + errmsg= err_buff2; + snprintf(err_buff2, sizeof(err_buff2), + "Master reported unrecognized MySQL version: %s", + mysql->server_version); err_code= ER_SLAVE_FATAL_ERROR; - sprintf(err_buff, ER(err_code), errmsg); + sprintf(err_buff, ER(err_code), err_buff2); break; case 3: mi->rli.relay_log.description_event_for_queue= new @@ -1386,11 +1528,11 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) mi->clock_diff_with_master= (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10)); } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { - mi->report(WARNING_LEVEL, mysql_errno(mysql), + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, "Get master clock failed with error: %s", mysql_error(mysql)); goto network_err; } @@ -1436,7 +1578,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) (master_res= mysql_store_result(mysql)) && (master_row= mysql_fetch_row(master_res))) { - if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) && + if ((global_system_variables.server_id == + (mi->master_id= strtoul(master_row[1], 0, 10))) && !mi->rli.replicate_same_server_id) { errmsg= "The slave I/O thread stops because master and slave have equal \ @@ -1450,11 +1593,11 @@ not always make sense; please check the manual before using it)."; } else if (mysql_errno(mysql)) { - if (check_io_slave_killed(mi->io_thd, mi, NULL)) + if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { - mi->report(WARNING_LEVEL, mysql_errno(mysql), + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, "Get master SERVER_ID failed with error: %s", mysql_error(mysql)); goto network_err; } @@ -1467,7 +1610,7 @@ when it try to get the value of SERVER_ID variable from master."; } else if (!master_row && master_res) { - mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE, + mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE, NULL, "Unknown system variable 'SERVER_ID' on master, \ maybe it is a *VERY OLD MASTER*."); } @@ -1523,11 +1666,11 @@ be equal for the Statement-format replication to work"; goto err; } } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { - mi->report(WARNING_LEVEL, mysql_errno(mysql), + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, "Get master COLLATION_SERVER failed with error: %s", mysql_error(mysql)); goto network_err; } @@ -1541,7 +1684,7 @@ when it try to get the value of COLLATION_SERVER global variable from master."; goto err; } else - mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE, + mi->report(WARNING_LEVEL, ER_UNKNOWN_SYSTEM_VARIABLE, NULL, "Unknown system variable 'COLLATION_SERVER' on master, \ maybe it is a *VERY OLD MASTER*. *NOTE*: slave may experience \ inconsistency if replicated data deals with collation."); @@ -1586,11 +1729,11 @@ be equal for the Statement-format replication to work"; goto err; } } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(err_code= mysql_errno(mysql))) { - mi->report(ERROR_LEVEL, err_code, + mi->report(ERROR_LEVEL, err_code, NULL, "Get master TIME_ZONE failed with error: %s", mysql_error(mysql)); goto network_err; @@ -1598,7 +1741,7 @@ be equal for the Statement-format replication to work"; else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE) { /* We use ERROR_LEVEL to get the error logged to file */ - mi->report(ERROR_LEVEL, err_code, + mi->report(ERROR_LEVEL, err_code, NULL, "MySQL master doesn't have a TIME_ZONE variable. Note that" "if your timezone is not same between master and slave, your " @@ -1637,13 +1780,13 @@ when it try to get the value of TIME_ZONE global variable from master."; }); if (mysql_real_query(mysql, query, strlen(query))) { - if (check_io_slave_killed(mi->io_thd, mi, NULL)) + if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; if (is_network_error(mysql_errno(mysql))) { IF_DBUG(heartbeat_network_error: , ) - mi->report(WARNING_LEVEL, mysql_errno(mysql), + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, "SET @master_heartbeat_period to master failed with error: %s", mysql_error(mysql)); mysql_free_result(mysql_store_result(mysql)); @@ -1686,7 +1829,7 @@ when it try to get the value of TIME_ZONE global variable from master."; rc= mysql_real_query(mysql, query, strlen(query)); if (rc != 0) { - if (check_io_slave_killed(mi->io_thd, mi, NULL)) + if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; if (mysql_errno(mysql) == ER_UNKNOWN_SYSTEM_VARIABLE) @@ -1695,7 +1838,7 @@ when it try to get the value of TIME_ZONE global variable from master."; if (global_system_variables.log_warnings > 1) { // this is tolerable as OM -> NS is supported - mi->report(WARNING_LEVEL, mysql_errno(mysql), + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, "Notifying master by %s failed with " "error: %s", query, mysql_error(mysql)); } @@ -1704,7 +1847,7 @@ when it try to get the value of TIME_ZONE global variable from master."; { if (is_network_error(mysql_errno(mysql))) { - mi->report(WARNING_LEVEL, mysql_errno(mysql), + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, "Notifying master by %s failed with " "error: %s", query, mysql_error(mysql)); mysql_free_result(mysql_store_result(mysql)); @@ -1736,11 +1879,11 @@ when it try to get the value of TIME_ZONE global variable from master."; DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF || mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32); } - else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; else if (is_network_error(mysql_errno(mysql))) { - mi->report(WARNING_LEVEL, mysql_errno(mysql), + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, "Get master BINLOG_CHECKSUM failed with error: %s", mysql_error(mysql)); goto network_err; } @@ -1777,7 +1920,7 @@ past_checksum: err_code= mysql_errno(mysql); if (is_network_error(err_code)) { - mi->report(ERROR_LEVEL, err_code, + mi->report(ERROR_LEVEL, err_code, NULL, "Setting master-side filtering of @@skip_replication failed " "with error: %s", mysql_error(mysql)); goto network_err; @@ -1807,13 +1950,277 @@ past_checksum: } } } + + /* Announce MariaDB slave capabilities. */ + DBUG_EXECUTE_IF("simulate_slave_capability_none", goto after_set_capability;); + { + int rc= DBUG_EVALUATE_IF("simulate_slave_capability_old_53", + mysql_real_query(mysql, STRING_WITH_LEN("SET @mariadb_slave_capability=" + STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_ANNOTATE))), + mysql_real_query(mysql, STRING_WITH_LEN("SET @mariadb_slave_capability=" + STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_MINE)))); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @mariadb_slave_capability failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @mariadb_slave_capability."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } +#ifndef DBUG_OFF +after_set_capability: +#endif + + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* Request dump to start from slave replication GTID state. */ + int rc; + char str_buf[256]; + String query_str(str_buf, sizeof(str_buf), system_charset_info); + query_str.length(0); + + /* + Read the master @@GLOBAL.gtid_domain_id variable. + This is mostly to check that master is GTID aware, but we could later + perhaps use it to check that different multi-source masters are correctly + configured with distinct domain_id. + */ + if (mysql_real_query(mysql, + STRING_WITH_LEN("SELECT @@GLOBAL.gtid_domain_id")) || + !(master_res= mysql_store_result(mysql)) || + !(master_row= mysql_fetch_row(master_res))) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Get master @@GLOBAL.gtid_domain_id failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + errmsg= "The slave I/O thread stops because master does not support " + "MariaDB global transaction id. A fatal error is encountered when " + "it tries to SELECT @@GLOBAL.gtid_domain_id."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + mysql_free_result(master_res); + master_res= NULL; + + query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + if (mi->gtid_current_pos.append_to_string(&query_str)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_connect_state."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + query_str.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @slave_connect_state failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_connect_state."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + + query_str.length(0); + if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_strict_mode="), + system_charset_info) || + query_str.append_ulonglong(opt_gtid_strict_mode != false)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @slave_gtid_strict_mode failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + + query_str.length(0); + if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_ignore_duplicates="), + system_charset_info) || + query_str.append_ulonglong(opt_gtid_ignore_duplicates != false)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory error " + "is encountered when it tries to set @slave_gtid_ignore_duplicates."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @slave_gtid_ignore_duplicates failed with " + "error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_gtid_ignore_duplicates."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID) + { + query_str.length(0); + query_str.append(STRING_WITH_LEN("SET @slave_until_gtid='"), + system_charset_info); + if (mi->rli.until_gtid_pos.append_to_string(&query_str)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_until_gtid."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + query_str.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Setting @slave_until_gtid failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_until_gtid."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } + } + else + { + /* + If we are not using GTID to connect this time, then instead request + the corresponding GTID position from the master, so that the user + can reconnect the next time using MASTER_GTID_POS=AUTO. + */ + char quote_buf[2*sizeof(mi->master_log_name)+1]; + char str_buf[28+2*sizeof(mi->master_log_name)+10]; + String query(str_buf, sizeof(str_buf), system_charset_info); + query.length(0); + + query.append("SELECT binlog_gtid_pos('"); + escape_quotes_for_mysql(&my_charset_bin, quote_buf, sizeof(quote_buf), + mi->master_log_name, strlen(mi->master_log_name)); + query.append(quote_buf); + query.append("',"); + query.append_ulonglong(mi->master_log_pos); + query.append(")"); + + if (!mysql_real_query(mysql, query.c_ptr_safe(), query.length()) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res)) && + (master_row[0] != NULL)) + { + rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0], + strlen(master_row[0]), false, false); + } + else if (check_io_slave_killed(mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), NULL, + "Get master GTID position failed with error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + /* + ToDo: If the master does not have the binlog_gtid_pos() function, it + just means that it is an old master with no GTID support, so we should + do nothing. + + However, if binlog_gtid_pos() exists, but fails or returns NULL, then + it means that the requested position is not valid. We could use this + to catch attempts to replicate from within the middle of an event, + avoiding strange failures or possible corruption. + */ + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + } + err: if (errmsg) { if (master_res) mysql_free_result(master_res); DBUG_ASSERT(err_code != 0); - mi->report(ERROR_LEVEL, err_code, "%s", err_buff); + mi->report(ERROR_LEVEL, err_code, NULL, "%s", err_buff); DBUG_RETURN(1); } @@ -1834,21 +2241,27 @@ slave_killed_err: static bool wait_for_relay_log_space(Relay_log_info* rli) { bool slave_killed=0; + bool ignore_log_space_limit; Master_info* mi = rli->mi; - const char *save_proc_info; + PSI_stage_info old_stage; THD* thd = mi->io_thd; DBUG_ENTER("wait_for_relay_log_space"); mysql_mutex_lock(&rli->log_space_lock); - save_proc_info= thd->enter_cond(&rli->log_space_cond, - &rli->log_space_lock, - "\ -Waiting for the slave SQL thread to free enough relay log space"); + thd->ENTER_COND(&rli->log_space_cond, + &rli->log_space_lock, + &stage_waiting_for_relay_log_space, + &old_stage); while (rli->log_space_limit < rli->log_space_total && - !(slave_killed=io_slave_killed(thd,mi)) && + !(slave_killed=io_slave_killed(mi)) && !rli->ignore_log_space_limit) mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock); + ignore_log_space_limit= rli->ignore_log_space_limit; + rli->ignore_log_space_limit= 0; + + thd->EXIT_COND(&old_stage); + /* Makes the IO thread read only one event at a time until the SQL thread is able to purge the relay @@ -1872,7 +2285,8 @@ Waiting for the slave SQL thread to free enough relay log space"); thread sleeps waiting for events. */ - if (rli->ignore_log_space_limit) + + if (ignore_log_space_limit) { #ifndef DBUG_OFF { @@ -1889,16 +2303,13 @@ Waiting for the slave SQL thread to free enough relay log space"); #endif if (rli->sql_force_rotate_relay) { - mysql_mutex_lock(&active_mi->data_lock); + mysql_mutex_lock(&mi->data_lock); rotate_relay_log(rli->mi); - mysql_mutex_unlock(&active_mi->data_lock); + mysql_mutex_unlock(&mi->data_lock); rli->sql_force_rotate_relay= false; } - - rli->ignore_log_space_limit= false; } - thd->exit_cond(save_proc_info); DBUG_RETURN(slave_killed); } @@ -1924,34 +2335,66 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) DBUG_ASSERT(thd == mi->io_thd); mysql_mutex_lock(log_lock); - if (rli->ign_master_log_name_end[0]) - { - DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); - Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end, - 0, rli->ign_master_log_pos_end, - Rotate_log_event::DUP_NAME); - rli->ign_master_log_name_end[0]= 0; - /* can unlock before writing as slave SQL thd will soon see our Rotate */ + if (rli->ign_master_log_name_end[0] || rli->ign_gtids.count()) + { + Rotate_log_event *rev= NULL; + Gtid_list_log_event *glev= NULL; + if (rli->ign_master_log_name_end[0]) + { + rev= new Rotate_log_event(rli->ign_master_log_name_end, + 0, rli->ign_master_log_pos_end, + Rotate_log_event::DUP_NAME); + rli->ign_master_log_name_end[0]= 0; + if (unlikely(!(bool)rev)) + mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, NULL, + ER(ER_SLAVE_CREATE_EVENT_FAILURE), + "Rotate_event (out of memory?)," + " SHOW SLAVE STATUS may be inaccurate"); + } + if (rli->ign_gtids.count()) + { + glev= new Gtid_list_log_event(&rli->ign_gtids, + Gtid_list_log_event::FLAG_IGN_GTIDS); + rli->ign_gtids.reset(); + if (unlikely(!(bool)glev)) + mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, NULL, + ER(ER_SLAVE_CREATE_EVENT_FAILURE), + "Gtid_list_event (out of memory?)," + " gtid_slave_pos may be inaccurate"); + } + + /* Can unlock before writing as slave SQL thd will soon see our event. */ mysql_mutex_unlock(log_lock); - if (likely((bool)ev)) + if (rev) { - ev->server_id= 0; // don't be ignored by slave SQL thread - if (unlikely(rli->relay_log.append(ev))) - mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, + DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); + rev->server_id= 0; // don't be ignored by slave SQL thread + if (unlikely(rli->relay_log.append(rev))) + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "failed to write a Rotate event" " to the relay log, SHOW SLAVE STATUS may be" " inaccurate"); + delete rev; + } + if (glev) + { + DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events")); + glev->server_id= 0; // don't be ignored by slave SQL thread + glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos + if (unlikely(rli->relay_log.append(glev))) + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, + ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), + "failed to write a Gtid_list event to the relay log, " + "gtid_slave_pos may be inaccurate"); + delete glev; + } + if (likely (rev || glev)) + { rli->relay_log.harvest_bytes_written(&rli->log_space_total); if (flush_master_info(mi, TRUE, TRUE)) sql_print_error("Failed to flush master info file"); - delete ev; } - else - mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, - ER(ER_SLAVE_CREATE_EVENT_FAILURE), - "Rotate_event (out of memory?)," - " SHOW SLAVE STATUS may be inaccurate"); } else mysql_mutex_unlock(log_lock); @@ -2000,7 +2443,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, DBUG_RETURN(0); } - int4store(pos, server_id); pos+= 4; + int4store(pos, global_system_variables.server_id); pos+= 4; pos= net_store_data(pos, (uchar*) report_host, report_host_len); pos= net_store_data(pos, (uchar*) report_user, report_user_len); pos= net_store_data(pos, (uchar*) report_password, report_password_len); @@ -2020,12 +2463,12 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, { *suppress_warnings= TRUE; // Suppress reconnect warning } - else if (!check_io_slave_killed(mi->io_thd, mi, NULL)) + else if (!check_io_slave_killed(mi, NULL)) { char buf[256]; my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql), mysql_errno(mysql)); - mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, + mi->report(ERROR_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, NULL, ER(ER_SLAVE_MASTER_COM_FAILURE), "COM_REGISTER_SLAVE", buf); } DBUG_RETURN(1); @@ -2045,15 +2488,40 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, @retval FALSE success @retval TRUE failure */ -bool show_master_info(THD* thd, Master_info* mi) + +bool show_master_info(THD *thd, Master_info *mi, bool full) +{ + DBUG_ENTER("show_master_info"); + String gtid_pos; + + if (full && rpl_global_gtid_slave_state.tostring(>id_pos, NULL, 0)) + DBUG_RETURN(TRUE); + if (send_show_master_info_header(thd, full, gtid_pos.length())) + DBUG_RETURN(TRUE); + if (send_show_master_info_data(thd, mi, full, >id_pos)) + DBUG_RETURN(TRUE); + my_eof(thd); + DBUG_RETURN(FALSE); +} + +static bool send_show_master_info_header(THD *thd, bool full, + size_t gtid_pos_length) { - // TODO: fix this for multi-master List<Item> field_list; Protocol *protocol= thd->protocol; - DBUG_ENTER("show_master_info"); + Master_info *mi; + DBUG_ENTER("show_master_info_header"); + + if (full) + { + field_list.push_back(new Item_empty_string("Connection_name", + MAX_CONNECTION_NAME)); + field_list.push_back(new Item_empty_string("Slave_SQL_State", + 30)); + } field_list.push_back(new Item_empty_string("Slave_IO_State", - 14)); + 30)); field_list.push_back(new Item_empty_string("Master_Host", sizeof(mi->host))); field_list.push_back(new Item_empty_string("Master_User", @@ -2116,23 +2584,71 @@ bool show_master_info(THD* thd, Master_info* mi) FN_REFLEN)); field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong), MYSQL_TYPE_LONG)); + field_list.push_back(new Item_empty_string("Master_SSL_Crl", + sizeof(mi->ssl_crl))); + field_list.push_back(new Item_empty_string("Master_SSL_Crlpath", + sizeof(mi->ssl_crlpath))); + field_list.push_back(new Item_empty_string("Using_Gtid", + sizeof("Current_Pos")-1)); + field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30)); + if (full) + { + field_list.push_back(new Item_return_int("Retried_transactions", + 10, MYSQL_TYPE_LONG)); + field_list.push_back(new Item_return_int("Max_relay_log_size", + 10, MYSQL_TYPE_LONGLONG)); + field_list.push_back(new Item_return_int("Executed_log_entries", + 10, MYSQL_TYPE_LONG)); + field_list.push_back(new Item_return_int("Slave_received_heartbeats", + 10, MYSQL_TYPE_LONG)); + field_list.push_back(new Item_float("Slave_heartbeat_period", + 0.0, 3, 10)); + field_list.push_back(new Item_empty_string("Gtid_Slave_Pos", + gtid_pos_length)); + } if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); + DBUG_RETURN(FALSE); +} + + +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, + String *gtid_pos) +{ + DBUG_ENTER("send_show_master_info_data"); if (mi->host[0]) { DBUG_PRINT("info",("host is set: '%s'", mi->host)); String *packet= &thd->packet; + Protocol *protocol= thd->protocol; + Rpl_filter *rpl_filter= mi->rpl_filter; + char buf[256]; + String tmp(buf, sizeof(buf), &my_charset_bin); + protocol->prepare_for_resend(); /* slave_running can be accessed without run_lock but not other non-volotile members like mi->io_thd, which is guarded by the mutex. */ + if (full) + protocol->store(mi->connection_name.str, mi->connection_name.length, + &my_charset_bin); mysql_mutex_lock(&mi->run_lock); - protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin); + if (full) + { + /* + Show what the sql driver replication thread is doing + This is only meaningful if there is only one slave thread. + */ + protocol->store(mi->rli.sql_driver_thd ? + mi->rli.sql_driver_thd->get_proc_info() : "", + &my_charset_bin); + } + protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin); mysql_mutex_unlock(&mi->run_lock); mysql_mutex_lock(&mi->data_lock); @@ -2157,8 +2673,6 @@ bool show_master_info(THD* thd, Master_info* mi) protocol->store(rpl_filter->get_do_db()); protocol->store(rpl_filter->get_ignore_db()); - char buf[256]; - String tmp(buf, sizeof(buf), &my_charset_bin); rpl_filter->get_do_table(&tmp); protocol->store(&tmp); rpl_filter->get_ignore_table(&tmp); @@ -2177,7 +2691,8 @@ bool show_master_info(THD* thd, Master_info* mi) protocol->store( mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None": ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master": - "Relay"), &my_charset_bin); + ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay": + "Gtid")), &my_charset_bin); protocol->store(mi->rli.until_log_name, &my_charset_bin); protocol->store((ulonglong) mi->rli.until_log_pos); @@ -2199,8 +2714,24 @@ bool show_master_info(THD* thd, Master_info* mi) if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) && mi->rli.slave_running) { - long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp) - - mi->clock_diff_with_master); + long time_diff; + bool idle; + time_t stamp= mi->rli.last_master_timestamp; + + if (!stamp) + idle= true; + else + { + idle= mi->rli.sql_thread_caught_up; + if (opt_slave_parallel_threads > 0 && idle && + !mi->rli.parallel.workers_idle()) + idle= false; + } + if (idle) + time_diff= 0; + else + { + time_diff= ((long)(time(0) - stamp) - mi->clock_diff_with_master); /* Apparently on some systems time_diff can be <0. Here are possible reasons related to MySQL: @@ -2216,13 +2747,15 @@ bool show_master_info(THD* thd, Master_info* mi) slave is 2. At SHOW SLAVE STATUS time, assume that the difference between timestamp of slave and rli->last_master_timestamp is 0 (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result. - This confuses users, so we don't go below 0: hence the max(). + This confuses users, so we don't go below 0. last_master_timestamp == 0 (an "impossible" timestamp 1970) is a special marker to say "consider we have caught up". */ - protocol->store((longlong)(mi->rli.last_master_timestamp ? - max(0, time_diff) : 0)); + if (time_diff < 0) + time_diff= 0; + } + protocol->store((longlong)time_diff); } else { @@ -2264,6 +2797,26 @@ bool show_master_info(THD* thd, Master_info* mi) } // Master_Server_id protocol->store((uint32) mi->master_id); + // Master_Ssl_Crl + protocol->store(mi->ssl_ca, &my_charset_bin); + // Master_Ssl_Crlpath + protocol->store(mi->ssl_capath, &my_charset_bin); + protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin); + { + char buff[30]; + String tmp(buff, sizeof(buff), system_charset_info); + mi->gtid_current_pos.to_string(&tmp); + protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin); + } + if (full) + { + protocol->store((uint32) mi->rli.retried_trans); + protocol->store((ulonglong) mi->rli.max_relay_log_size); + protocol->store((uint32) mi->rli.executed_entries); + protocol->store((uint32) mi->received_heartbeats); + protocol->store((double) mi->heartbeat_period, 3, &tmp); + protocol->store(gtid_pos->ptr(), gtid_pos->length(), &my_charset_bin); + } mysql_mutex_unlock(&mi->rli.err_lock); mysql_mutex_unlock(&mi->err_lock); @@ -2273,6 +2826,78 @@ bool show_master_info(THD* thd, Master_info* mi) if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length())) DBUG_RETURN(TRUE); } + DBUG_RETURN(FALSE); +} + + +/* Used to sort connections by name */ + +static int cmp_mi_by_name(const Master_info **arg1, + const Master_info **arg2) +{ + return my_strcasecmp(system_charset_info, (*arg1)->connection_name.str, + (*arg2)->connection_name.str); +} + + +/** + Execute a SHOW FULL SLAVE STATUS statement. + + @param thd Pointer to THD object for the client thread executing the + statement. + + Elements are sorted according to the original connection_name. + + @retval FALSE success + @retval TRUE failure + + @note + master_info_index is protected by LOCK_active_mi. +*/ + +bool show_all_master_info(THD* thd) +{ + uint i, elements; + String gtid_pos; + Master_info **tmp; + DBUG_ENTER("show_master_info"); + mysql_mutex_assert_owner(&LOCK_active_mi); + + gtid_pos.length(0); + if (rpl_append_gtid_state(>id_pos, true)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + DBUG_RETURN(TRUE); + } + + if (send_show_master_info_header(thd, 1, gtid_pos.length())) + DBUG_RETURN(TRUE); + + if (!master_info_index || + !(elements= master_info_index->master_info_hash.records)) + goto end; + + /* + Sort lines to get them into a predicted order + (needed for test cases and to not confuse users) + */ + if (!(tmp= (Master_info**) thd->alloc(sizeof(Master_info*) * elements))) + DBUG_RETURN(TRUE); + + for (i= 0; i < elements; i++) + { + tmp[i]= (Master_info *) my_hash_element(&master_info_index-> + master_info_hash, i); + } + my_qsort(tmp, elements, sizeof(Master_info*), (qsort_cmp) cmp_mi_by_name); + + for (i= 0; i < elements; i++) + { + if (send_show_master_info_data(thd, tmp[i], 1, >id_pos)) + DBUG_RETURN(TRUE); + } + +end: my_eof(thd); DBUG_RETURN(FALSE); } @@ -2300,7 +2925,7 @@ void set_slave_thread_options(THD* thd) DBUG_VOID_RETURN; } -void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli) +void set_slave_thread_default_charset(THD* thd, rpl_group_info *rgi) { DBUG_ENTER("set_slave_thread_default_charset"); @@ -2312,13 +2937,7 @@ void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli) global_system_variables.collation_server; thd->update_charset(); - /* - We use a const cast here since the conceptual (and externally - visible) behavior of the function is to set the default charset of - the thread. That the cache has to be invalidated is a secondary - effect. - */ - const_cast<Relay_log_info*>(rli)->cached_charset_invalidate(); + thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); DBUG_VOID_RETURN; } @@ -2326,40 +2945,41 @@ void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli) init_slave_thread() */ -static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) +static int init_slave_thread(THD* thd, Master_info *mi, + SLAVE_THD_TYPE thd_type) { DBUG_ENTER("init_slave_thread"); -#if !defined(DBUG_OFF) - int simulate_error= 0; -#endif - thd->system_thread = (thd_type == SLAVE_THD_SQL) ? - SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; - thd->security_ctx->skip_grants(); - my_net_init(&thd->net, 0); - thd->slave_thread = 1; - thd->enable_slow_log= opt_log_slow_slave_statements; - thd->variables.log_slow_filter= global_system_variables.log_slow_filter; - set_slave_thread_options(thd); - thd->client_capabilities = CLIENT_LOCAL_FILES; - mysql_mutex_lock(&LOCK_thread_count); - thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; - mysql_mutex_unlock(&LOCK_thread_count); - + int simulate_error __attribute__((unused))= 0; DBUG_EXECUTE_IF("simulate_io_slave_error_on_init", simulate_error|= (1 << SLAVE_THD_IO);); DBUG_EXECUTE_IF("simulate_sql_slave_error_on_init", simulate_error|= (1 << SLAVE_THD_SQL);); + /* We must call store_globals() before doing my_net_init() */ if (init_thr_lock() || thd->store_globals() || + my_net_init(&thd->net, 0, MYF(MY_THREAD_SPECIFIC)) || IF_DBUG(simulate_error & (1<< thd_type), 0)) { thd->cleanup(); DBUG_RETURN(-1); } + thd->system_thread = (thd_type == SLAVE_THD_SQL) ? + SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; + thd->security_ctx->skip_grants(); + thd->slave_thread= 1; + thd->connection_name= mi->connection_name; + thd->enable_slow_log= opt_log_slow_slave_statements; + thd->variables.log_slow_filter= global_system_variables.log_slow_filter; + set_slave_thread_options(thd); + thd->client_capabilities = CLIENT_LOCAL_FILES; + mysql_mutex_lock(&LOCK_thread_count); + thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; + mysql_mutex_unlock(&LOCK_thread_count); + if (thd_type == SLAVE_THD_SQL) - thd_proc_info(thd, "Waiting for the next event in relay log"); + THD_STAGE_INFO(thd, stage_waiting_for_the_next_event_in_relay_log); else - thd_proc_info(thd, "Waiting for master update"); + THD_STAGE_INFO(thd, stage_waiting_for_master_update); thd->set_time(); /* Do not use user-supplied timeout value for system threads. */ thd->variables.lock_wait_timeout= LONG_TIMEOUT; @@ -2377,13 +2997,12 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) @retval True if the thread has been killed, false otherwise. */ template <typename killed_func, typename rpl_info> -static inline bool slave_sleep(THD *thd, time_t seconds, - killed_func func, rpl_info info) +static bool slave_sleep(THD *thd, time_t seconds, + killed_func func, rpl_info info) { bool ret; struct timespec abstime; - const char *old_proc_info; mysql_mutex_t *lock= &info->sleep_lock; mysql_cond_t *cond= &info->sleep_cond; @@ -2391,16 +3010,16 @@ static inline bool slave_sleep(THD *thd, time_t seconds, /* Absolute system time at which the sleep time expires. */ set_timespec(abstime, seconds); mysql_mutex_lock(lock); - old_proc_info= thd->enter_cond(cond, lock, thd->proc_info); + thd->ENTER_COND(cond, lock, NULL, NULL); - while (! (ret= func(thd, info))) + while (! (ret= func(info))) { int error= mysql_cond_timedwait(cond, lock, &abstime); if (error == ETIMEDOUT || error == ETIME) break; } /* Implicitly unlocks the mutex. */ - thd->exit_cond(old_proc_info); + thd->EXIT_COND(NULL); return ret; } @@ -2427,7 +3046,7 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); - int4store(buf + 6, server_id); + int4store(buf + 6, global_system_variables.server_id); len = (uint) strlen(logname); memcpy(buf + 10, logname,len); if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) @@ -2521,12 +3140,13 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) that the error is temporary by pushing a warning with the error code ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary. */ -static int has_temporary_error(THD *thd) +int +has_temporary_error(THD *thd) { DBUG_ENTER("has_temporary_error"); DBUG_EXECUTE_IF("all_errors_are_temporary_errors", - if (thd->stmt_da->is_error()) + if (thd->get_stmt_da()->is_error()) { thd->clear_error(); my_error(ER_LOCK_DEADLOCK, MYF(0)); @@ -2545,16 +3165,16 @@ static int has_temporary_error(THD *thd) currently, InnoDB deadlock detected by InnoDB or lock wait timeout (innodb_lock_wait_timeout exceeded */ - if (thd->stmt_da->sql_errno() == ER_LOCK_DEADLOCK || - thd->stmt_da->sql_errno() == ER_LOCK_WAIT_TIMEOUT) + if (thd->get_stmt_da()->sql_errno() == ER_LOCK_DEADLOCK || + thd->get_stmt_da()->sql_errno() == ER_LOCK_WAIT_TIMEOUT) DBUG_RETURN(1); #ifdef HAVE_NDB_BINLOG /* currently temporary error set in ndbcluster */ - List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list()); - MYSQL_ERROR *err; + List_iterator_fast<Sql_condition> it(thd->warning_info->warn_list()); + Sql_condition *err; while ((err= it++)) { DBUG_PRINT("info", ("has condition %d %s", err->get_sql_errno(), @@ -2598,19 +3218,22 @@ static int has_temporary_error(THD *thd) @retval 2 No error calling ev->apply_event(), but error calling ev->update_pos(). */ -int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) +int apply_event_and_update_pos(Log_event* ev, THD* thd, + rpl_group_info *rgi, + rpl_parallel_thread *rpt) { int exec_res= 0; - + Relay_log_info* rli= rgi->rli; DBUG_ENTER("apply_event_and_update_pos"); DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)", ev->get_type_str(), ev->get_type_code(), ev->server_id)); - DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu", + DBUG_PRINT("info", ("thd->options: '%s%s%s' rgi->last_event_start_time: %lu", FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->variables.option_bits, OPTION_BEGIN), - (ulong) rli->last_event_start_time)); + FLAGSTR(thd->variables.option_bits, OPTION_GTID_BEGIN), + (ulong) rgi->last_event_start_time)); /* Execute the event to change the database and update the binary @@ -2636,7 +3259,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) has a Rotate etc). */ - thd->server_id = ev->server_id; // use the original server id for logging + /* Use the original server id for logging. */ + thd->variables.server_id = ev->server_id; thd->set_time(); // time the query thd->lex->current_select= 0; if (!ev->when) @@ -2650,12 +3274,21 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); ev->thd = thd; // because up to this point, ev->thd == 0 - int reason= ev->shall_skip(rli); + int reason= ev->shall_skip(rgi); if (reason == Log_event::EVENT_SKIP_COUNT) - sql_slave_skip_counter= --rli->slave_skip_counter; + { + DBUG_ASSERT(rli->slave_skip_counter > 0); + rli->slave_skip_counter--; + } mysql_mutex_unlock(&rli->data_lock); + DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event", + { + DBUG_ASSERT(!debug_sync_set_action + (thd, STRING_WITH_LEN("now WAIT_FOR continue"))); + DBUG_SET_INITIAL("-d,inject_slave_sql_before_apply_event"); + };); if (reason == Log_event::EVENT_SKIP_NOT) - exec_res= ev->apply_event(rli); + exec_res= ev->apply_event(rgi); #ifndef DBUG_OFF /* @@ -2671,9 +3304,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) // EVENT_SKIP_COUNT "skipped because event skip counter was non-zero" }; - DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d", - test(thd->variables.option_bits & OPTION_BEGIN), - rli->get_flag(Relay_log_info::IN_STMT))); + DBUG_PRINT("info", ("OPTION_BEGIN: %d IN_STMT: %d IN_TRANSACTION: %d", + MY_TEST(thd->variables.option_bits & OPTION_BEGIN), + rli->get_flag(Relay_log_info::IN_STMT), + rli->get_flag(Relay_log_info::IN_TRANSACTION))); DBUG_PRINT("skip_event", ("%s event was %s", ev->get_type_str(), explain[reason])); #endif @@ -2681,7 +3315,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) DBUG_PRINT("info", ("apply_event error = %d", exec_res)); if (exec_res == 0) { - int error= ev->update_pos(rli); + int error= ev->update_pos(rgi); #ifdef HAVE_valgrind if (!rli->is_fake) #endif @@ -2707,7 +3341,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) if (error) { char buf[22]; - rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, + rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, rgi->gtid_info(), "It was not possible to update the positions" " of the relay log information: the slave may" " be in an inconsistent state." @@ -2717,12 +3351,94 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) DBUG_RETURN(2); } } + else + { + /* + Make sure we do not erroneously update gtid_slave_pos with a lingering + GTID from this failed event group (MDEV-4906). + */ + rgi->gtid_pending= false; + } DBUG_RETURN(exec_res ? 1 : 0); } /** + Keep the relay log transaction state up to date. + + The state reflects how things are after the given event, that has just been + read from the relay log, is executed. + + This is only needed to ensure we: + - Don't abort the sql driver thread in the middle of an event group. + - Don't rotate the io thread in the middle of a statement or transaction. + The mechanism is that the io thread, when it needs to rotate the relay + log, will wait until the sql driver has read all the cached events + and then continue reading events one by one from the master until + the sql threads signals that log doesn't have an active group anymore. + + There are two possible cases. We keep them as 2 separate flags mainly + to make debugging easier. + + - IN_STMT is set when we have read an event that should be used + together with the next event. This is for example setting a + variable that is used when executing the next statement. + - IN_TRANSACTION is set when we are inside a BEGIN...COMMIT group + + To test the state one should use the is_in_group() function. +*/ + +inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) +{ + Log_event_type typ= ev->get_type_code(); + + /* check if we are in a multi part event */ + if (ev->is_part_of_group()) + rli->set_flag(Relay_log_info::IN_STMT); + else if (Log_event::is_group_event(typ)) + { + /* + If it was not a is_part_of_group() and not a group event (like + rotate) then we can reset the IN_STMT flag. We have the above + if only to allow us to have a rotate element anywhere. + */ + rli->clear_flag(Relay_log_info::IN_STMT); + } + + /* Check for an event that starts or stops a transaction */ + if (typ == QUERY_EVENT) + { + Query_log_event *qev= (Query_log_event*) ev; + /* + Trivial optimization to avoid the following somewhat expensive + checks. + */ + if (qev->q_len <= sizeof("ROLLBACK")) + { + if (qev->is_begin()) + rli->set_flag(Relay_log_info::IN_TRANSACTION); + if (qev->is_commit() || qev->is_rollback()) + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + } + } + if (typ == XID_EVENT) + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + if (typ == GTID_EVENT && + !(((Gtid_log_event*) ev)->flags2 & Gtid_log_event::FL_STANDALONE)) + { + /* This GTID_EVENT will generate a BEGIN event */ + rli->set_flag(Relay_log_info::IN_TRANSACTION); + } + + DBUG_PRINT("info", ("event: %u IN_STMT: %d IN_TRANSACTION: %d", + (uint) typ, + rli->get_flag(Relay_log_info::IN_STMT), + rli->get_flag(Relay_log_info::IN_TRANSACTION))); +} + + +/** Top-level function for executing the next event from the relay log. This function reads the event from the relay log, executes it, and @@ -2750,22 +3466,23 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) @retval 1 The event was not applied. */ -static int exec_relay_log_event(THD* thd, Relay_log_info* rli) + +static int exec_relay_log_event(THD* thd, Relay_log_info* rli, + rpl_group_info *serial_rgi) { + ulonglong event_size; DBUG_ENTER("exec_relay_log_event"); /* - We acquire this mutex since we need it for all operations except - event execution. But we will release it in places where we will - wait for something for example inside of next_event(). - */ + We acquire this mutex since we need it for all operations except + event execution. But we will release it in places where we will + wait for something for example inside of next_event(). + */ mysql_mutex_lock(&rli->data_lock); - Log_event * ev = next_event(rli); - - DBUG_ASSERT(rli->sql_thd==thd); + Log_event *ev= next_event(serial_rgi, &event_size); - if (sql_slave_killed(thd,rli)) + if (sql_slave_killed(serial_rgi)) { mysql_mutex_unlock(&rli->data_lock); delete ev; @@ -2774,22 +3491,40 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) if (ev) { int exec_res; + Log_event_type typ= ev->get_type_code(); + + /* + Even if we don't execute this event, we keep the master timestamp, + so that seconds behind master shows correct delta (there are events + that are not replayed, so we keep falling behind). + + If it is an artificial event, or a relay log event (IO thread generated + event) or ev->when is set to 0, we don't update the + last_master_timestamp. + */ + if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) + { + rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; + DBUG_ASSERT(rli->last_master_timestamp >= 0); + } /* This tests if the position of the beginning of the current event hits the UNTIL barrier. */ - if (rli->until_condition != Relay_log_info::UNTIL_NONE && + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && rli->is_until_satisfied(thd, ev)) { char buf[22]; sql_print_information("Slave SQL thread stopped because it reached its" " UNTIL position %s", llstr(rli->until_pos(), buf)); /* - Setting abort_slave flag because we do not want additional message about - error in query execution to be printed. + Setting abort_slave flag because we do not want additional + message about error in query execution to be printed. */ rli->abort_slave= 1; + rli->stop_for_until= true; mysql_mutex_unlock(&rli->data_lock); delete ev; DBUG_RETURN(1); @@ -2802,56 +3537,78 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) read hanging if the realy log does not have any more events. */ DBUG_EXECUTE_IF("incomplete_group_in_relay_log", - if ((ev->get_type_code() == XID_EVENT) || - ((ev->get_type_code() == QUERY_EVENT) && + if ((typ == XID_EVENT) || + ((typ == QUERY_EVENT) && strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) { DBUG_ASSERT(thd->transaction.all.modified_non_trans_table); rli->abort_slave= 1; mysql_mutex_unlock(&rli->data_lock); delete ev; - rli->inc_event_relay_log_pos(); + serial_rgi->inc_event_relay_log_pos(); DBUG_RETURN(0); };); } - exec_res= apply_event_and_update_pos(ev, thd, rli); + update_state_of_relay_log(rli, ev); - switch (ev->get_type_code()) { - case FORMAT_DESCRIPTION_EVENT: - /* - Format_description_log_event should not be deleted because it - will be used to read info about the relay log's format; - it will be deleted when the SQL thread does not need it, - i.e. when this thread terminates. - */ - break; - case ANNOTATE_ROWS_EVENT: - /* - Annotate_rows event should not be deleted because after it has - been applied, thd->query points to the string inside this event. - The thd->query will be used to generate new Annotate_rows event - during applying the subsequent Rows events. - */ - rli->set_annotate_event((Annotate_rows_log_event*) ev); - break; - case DELETE_ROWS_EVENT: - case UPDATE_ROWS_EVENT: - case WRITE_ROWS_EVENT: + if (opt_slave_parallel_threads > 0) + { + int res= rli->parallel.do_event(serial_rgi, ev, event_size); + if (res >= 0) + DBUG_RETURN(res); + /* + Else we proceed to execute the event non-parallel. + This is the case for pre-10.0 events without GTID, and for handling + slave_skip_counter. + */ + } + + if (typ == GTID_EVENT) + { + Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev); + + /* + For GTID, allocate a new sub_id for the given domain_id. + The sub_id must be allocated in increasing order of binlog order. + */ + if (event_group_new_gtid(serial_rgi, gev)) + { + sql_print_error("Error reading relay log event: %s", "slave SQL thread " + "aborted because of out-of-memory error"); + mysql_mutex_unlock(&rli->data_lock); + delete ev; + DBUG_RETURN(1); + } + + if (opt_gtid_ignore_duplicates) + { + int res= rpl_global_gtid_slave_state.check_duplicate_gtid + (&serial_rgi->current_gtid, serial_rgi); + if (res < 0) + { + sql_print_error("Error processing GTID event: %s", "slave SQL " + "thread aborted because of out-of-memory error"); + mysql_mutex_unlock(&rli->data_lock); + delete ev; + DBUG_RETURN(1); + } /* - After the last Rows event has been applied, the saved Annotate_rows - event (if any) is not needed anymore and can be deleted. + If we need to skip this event group (because the GTID was already + applied), then do it using the code for slave_skip_counter, which + is able to handle skipping until the end of the event group. */ - if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) - rli->free_annotate_event(); - /* fall through */ - default: - DBUG_PRINT("info", ("Deleting the event after it has been executed")); - if (!rli->is_deferred_event(ev)) - delete ev; - break; + if (!res) + rli->slave_skip_counter= 1; + } } + serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; + serial_rgi->event_relay_log_name= rli->event_relay_log_name; + serial_rgi->event_relay_log_pos= rli->event_relay_log_pos; + exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); + + delete_or_keep_event_post_apply(serial_rgi, typ, ev); /* update_log_pos failed: this should not happen, so we don't @@ -2867,6 +3624,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) if (exec_res && (temp_err= has_temporary_error(thd))) { const char *errmsg; + rli->clear_error(); /* We were in a transaction which has been rolled back because of a temporary error; @@ -2874,14 +3632,16 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) Note, if lock wait timeout (innodb_lock_wait_timeout exceeded) there is no rollback since 5.0.13 (ref: manual). We have to not only seek but also - a) init_master_info(), to seek back to hot relay log's start for later - (for when we will come back to this hot log after re-processing the - possibly existing old logs where BEGIN is: check_binlog_magic() will - then need the cache to be at position 0 (see comments at beginning of + + a) init_master_info(), to seek back to hot relay log's start + for later (for when we will come back to this hot log after + re-processing the possibly existing old logs where BEGIN is: + check_binlog_magic() will then need the cache to be at + position 0 (see comments at beginning of init_master_info()). b) init_relay_log_pos(), because the BEGIN may be an older relay log. */ - if (rli->trans_retries < slave_trans_retries) + if (serial_rgi->trans_retries < slave_trans_retries) { if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL)) sql_print_error("Failed to initialize the master info structure"); @@ -2894,16 +3654,19 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) else { exec_res= 0; - rli->cleanup_context(thd, 1); + serial_rgi->cleanup_context(thd, 1); /* chance for concurrent connection to get more locks */ - slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE), - sql_slave_killed, rli); + slave_sleep(thd, MY_MIN(serial_rgi->trans_retries, + MAX_SLAVE_RETRY_PAUSE), + sql_slave_killed, serial_rgi); + serial_rgi->trans_retries++; mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS - rli->trans_retries++; rli->retried_trans++; + statistic_increment(slave_retried_transactions, LOCK_status); mysql_mutex_unlock(&rli->data_lock); DBUG_PRINT("info", ("Slave retries transaction " - "rli->trans_retries: %lu", rli->trans_retries)); + "rgi->trans_retries: %lu", + serial_rgi->trans_retries)); } } else @@ -2922,15 +3685,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) event, the execution will proceed as usual; in the case of a non-transient error, the slave will stop with an error. */ - rli->trans_retries= 0; // restart from fresh - DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu", - rli->trans_retries)); + serial_rgi->trans_retries= 0; // restart from fresh + DBUG_PRINT("info", ("Resetting retry counter, rgi->trans_retries: %lu", + serial_rgi->trans_retries)); } } + thread_safe_increment64(&rli->executed_entries, + &slave_executed_entries_lock); DBUG_RETURN(exec_res); } mysql_mutex_unlock(&rli->data_lock); - rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE, + rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE, NULL, ER(ER_SLAVE_RELAY_LOG_READ_FAILURE), "\ Could not parse relay log event entry. The possible reasons are: the master's \ binary log is corrupted (you can check this by running 'mysqlbinlog' on the \ @@ -2944,9 +3709,9 @@ on this slave.\ } -static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) +static bool check_io_slave_killed(Master_info *mi, const char *info) { - if (io_slave_killed(thd, mi)) + if (io_slave_killed(mi)) { if (info && global_system_variables.log_warnings) sql_print_information("%s", info); @@ -2997,21 +3762,35 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, return 1; // Don't retry forever slave_sleep(thd, mi->connect_retry, io_slave_killed, mi); } - if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING])) + if (check_io_slave_killed(mi, messages[SLAVE_RECON_MSG_KILLED_WAITING])) return 1; thd->proc_info = messages[SLAVE_RECON_MSG_AFTER]; if (!suppress_warnings) { char buf[256], llbuff[22]; + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + mi->gtid_current_pos.append_to_string(&tmp); + if (mi->events_queued_since_last_gtid == 0) + tmp.append(STRING_WITH_LEN("'")); + else + { + tmp.append(STRING_WITH_LEN("', GTID event skip ")); + tmp.append_ulonglong((ulonglong)mi->events_queued_since_last_gtid); + } + } my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED], - IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff)); + IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff), + tmp.c_ptr_safe()); /* Raise a warining during registering on master/requesting dump. Log a message reading event. */ if (messages[SLAVE_RECON_MSG_COMMAND][0]) { - mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, + mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, NULL, ER(ER_SLAVE_MASTER_COM_FAILURE), messages[SLAVE_RECON_MSG_COMMAND], buf); } @@ -3020,7 +3799,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, sql_print_information("%s", buf); } } - if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi)) + if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(mi)) { if (global_system_variables.log_warnings) sql_print_information("%s", messages[SLAVE_RECON_MSG_KILLED_AFTER]); @@ -3048,6 +3827,7 @@ pthread_handler_t handle_slave_io(void *arg) uint retry_count; bool suppress_warnings; int ret; + rpl_io_thread_info io_info; #ifndef DBUG_OFF uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0; #endif @@ -3075,16 +3855,17 @@ pthread_handler_t handle_slave_io(void *arg) pthread_detach_this_thread(); thd->thread_stack= (char*) &thd; // remember where our stack is mi->clear_error(); - if (init_slave_thread(thd, SLAVE_THD_IO)) + if (init_slave_thread(thd, mi, SLAVE_THD_IO)) { mysql_cond_broadcast(&mi->start_cond); sql_print_error("Failed during slave I/O thread initialization"); goto err_during_init; } + thd->system_thread_info.rpl_io_info= &io_info; mysql_mutex_lock(&LOCK_thread_count); threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); - mi->slave_running = 1; + mi->slave_running = MYSQL_SLAVE_RUN_NOT_CONNECT; mi->abort_slave = 0; mysql_mutex_unlock(&mi->run_lock); mysql_cond_broadcast(&mi->start_cond); @@ -3096,29 +3877,55 @@ pthread_handler_t handle_slave_io(void *arg) /* This must be called before run any binlog_relay_io hooks */ my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi); + /* Load the set of seen GTIDs, if we did not already. */ + if (rpl_load_gtid_slave_state(thd)) + { + mi->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Unable to load replication GTID slave state from mysql.%s: %s", + rpl_gtid_slave_state_table_name.str, + thd->get_stmt_da()->message()); + /* + If we are using old-style replication, we can continue, even though we + then will not be able to record the GTIDs we receive. But if using GTID, + we must give up. + */ + if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode) + goto err; + } + + if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) { - mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook"); goto err; } if (!(mi->mysql = mysql = mysql_init(NULL))) { - mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER(ER_SLAVE_FATAL_ERROR), "error in mysql_init()"); goto err; } - thd_proc_info(thd, "Connecting to master"); + THD_STAGE_INFO(thd, stage_connecting_to_master); // we can get killed during safe_connect if (!safe_connect(thd, mysql, mi)) { - sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," - "replication started in log '%s' at position %s", - mi->user, mi->host, mi->port, - IO_RPL_LOG_NAME, - llstr(mi->master_log_pos,llbuff)); + if (mi->using_gtid == Master_info::USE_GTID_NO) + sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," + "replication started in log '%s' at position %s", + mi->user, mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); + else + { + String tmp; + mi->gtid_current_pos.to_string(&tmp); + sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," + "replication starts at GTID position '%s'", + mi->user, mi->host, mi->port, tmp.c_ptr_safe()); + } } else { @@ -3128,6 +3935,25 @@ pthread_handler_t handle_slave_io(void *arg) connected: + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* + When the IO thread (re)connects to the master using GTID, it will + connect at the start of an event group. But the IO thread may have + previously logged part of the following event group to the relay + log. + + When the IO and SQL thread are started together, we erase any previous + relay logs, but this is not possible/desirable while the SQL thread is + running. To avoid duplicating partial event groups in the relay logs in + this case, we remember the count of events in any partially logged event + group before the reconnect, and then here at connect we set up a counter + to skip the already-logged part of the group. + */ + mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid; + mi->gtid_event_seen= false; + } + #ifdef ENABLED_DEBUG_SYNC DBUG_EXECUTE_IF("dbug.before_get_running_status_yes", { @@ -3143,7 +3969,7 @@ connected: // TODO: the assignment below should be under mutex (5.0) mi->slave_running= MYSQL_SLAVE_RUN_CONNECT; thd->slave_net = &mysql->net; - thd_proc_info(thd, "Checking master version"); + THD_STAGE_INFO(thd, stage_checking_master_version); ret= get_master_version_and_clock(mysql, mi); if (ret == 1) /* Fatal error */ @@ -3151,11 +3977,14 @@ connected: if (ret == 2) { - if (check_io_slave_killed(mi->io_thd, mi, "Slave I/O thread killed" + if (check_io_slave_killed(mi, "Slave I/O thread killed" "while calling get_master_version_and_clock(...)")) goto err; suppress_warnings= FALSE; - /* Try to reconnect because the error was caused by a transient network problem */ + /* + Try to reconnect because the error was caused by a transient network + problem + */ if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, reconnect_messages[SLAVE_RECON_ACT_REG])) goto err; @@ -3167,10 +3996,10 @@ connected: /* Register ourselves with the master. */ - thd_proc_info(thd, "Registering slave on master"); + THD_STAGE_INFO(thd, stage_registering_slave_on_master); if (register_slave_on_master(mysql, mi, &suppress_warnings)) { - if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed " + if (!check_io_slave_killed(mi, "Slave I/O thread killed " "while registering slave on master")) { sql_print_error("Slave I/O thread couldn't register on master"); @@ -3195,13 +4024,13 @@ connected: } DBUG_PRINT("info",("Starting reading binary log from master")); - while (!io_slave_killed(thd,mi)) + while (!io_slave_killed(mi)) { - thd_proc_info(thd, "Requesting binlog dump"); + THD_STAGE_INFO(thd, stage_requesting_binlog_dump); if (request_dump(thd, mysql, mi, &suppress_warnings)) { sql_print_error("Failed on request_dump()"); - if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ + if (check_io_slave_killed(mi, "Slave I/O thread killed while \ requesting master dump") || try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, reconnect_messages[SLAVE_RECON_ACT_DUMP])) @@ -3221,7 +4050,7 @@ requesting master dump") || const char *event_buf; DBUG_ASSERT(mi->last_error().number == 0); - while (!io_slave_killed(thd,mi)) + while (!io_slave_killed(mi)) { ulong event_len; /* @@ -3230,9 +4059,9 @@ requesting master dump") || important thing is to not confuse users by saying "reading" whereas we're in fact receiving nothing. */ - thd_proc_info(thd, "Waiting for master to send event"); + THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); event_len= read_event(mysql, mi, &suppress_warnings); - if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ + if (check_io_slave_killed(mi, "Slave I/O thread killed while \ reading event")) goto err; DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT", @@ -3256,18 +4085,18 @@ Log entry on master is longer than slave_max_allowed_packet (%lu) on \ slave. If the entry is correct, restart the server with a higher value of \ slave_max_allowed_packet", slave_max_allowed_packet); - mi->report(ERROR_LEVEL, ER_NET_PACKET_TOO_LARGE, + mi->report(ERROR_LEVEL, ER_NET_PACKET_TOO_LARGE, NULL, "%s", "Got a packet bigger than 'slave_max_allowed_packet' bytes"); goto err; case ER_MASTER_FATAL_ERROR_READING_BINLOG: - mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG, + mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG, NULL, ER(ER_MASTER_FATAL_ERROR_READING_BINLOG), mysql_error_number, mysql_error(mysql)); goto err; case ER_OUT_OF_RESOURCES: sql_print_error("\ Stopping slave I/O thread due to out-of-memory error from master"); - mi->report(ERROR_LEVEL, ER_OUT_OF_RESOURCES, + mi->report(ERROR_LEVEL, ER_OUT_OF_RESOURCES, NULL, "%s", ER(ER_OUT_OF_RESOURCES)); goto err; } @@ -3278,13 +4107,13 @@ Stopping slave I/O thread due to out-of-memory error from master"); } // if (event_len == packet_error) retry_count=0; // ok event, reset retry counter - thd_proc_info(thd, "Queueing master event to the relay log"); + THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); event_buf= (const char*)mysql->net.read_pos + 1; if (RUN_HOOK(binlog_relay_io, after_read_event, (thd, mi,(const char*)mysql->net.read_pos + 1, event_len, &event_buf, &event_len))) { - mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'after_read_event' hook"); goto err; @@ -3295,7 +4124,7 @@ Stopping slave I/O thread due to out-of-memory error from master"); bool synced= 0; if (queue_event(mi, event_buf, event_len)) { - mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "could not queue event from master"); goto err; @@ -3304,13 +4133,14 @@ Stopping slave I/O thread due to out-of-memory error from master"); if (RUN_HOOK(binlog_relay_io, after_queue_event, (thd, mi, event_buf, event_len, synced))) { - mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'after_queue_event' hook"); goto err; } - if (flush_master_info(mi, TRUE, TRUE)) + if (mi->using_gtid == Master_info::USE_GTID_NO && + flush_master_info(mi, TRUE, TRUE)) { sql_print_error("Failed to flush master info file"); goto err; @@ -3322,10 +4152,11 @@ Stopping slave I/O thread due to out-of-memory error from master"); - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so the clean value is 0), then we are reading only one more event as we should, and we'll block only at the next event. No big deal. - - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so - the clean value is 1), then we are going into wait_for_relay_log_space() - for no reason, but this function will do a clean read, notice the clean - value and exit immediately. + - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just + after (so the clean value is 1), then we are going into + wait_for_relay_log_space() for no reason, but this function + will do a clean read, notice the clean value and exit + immediately. */ #ifndef DBUG_OFF { @@ -3353,8 +4184,19 @@ log space"); // error = 0; err: // print the current replication position - sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", - IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + if (mi->using_gtid == Master_info::USE_GTID_NO) + sql_print_information("Slave I/O thread exiting, read up to log '%s', " + "position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + else + { + String tmp; + mi->gtid_current_pos.to_string(&tmp); + sql_print_information("Slave I/O thread exiting, read up to log '%s', " + "position %s; GTID position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff), + tmp.c_ptr_safe()); + } RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); thd->reset_query(); thd->reset_db(NULL, 0); @@ -3375,7 +4217,9 @@ err: mi->mysql=0; } write_ignored_events_info_to_relay_log(thd, mi); - thd_proc_info(thd, "Waiting for slave mutex on exit"); + if (mi->using_gtid != Master_info::USE_GTID_NO) + flush_master_info(mi, TRUE, TRUE); + THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); thd->add_status_to_global(); mysql_mutex_lock(&mi->run_lock); @@ -3385,15 +4229,13 @@ err_during_init: mi->rli.relay_log.description_event_for_queue= 0; // TODO: make rpl_status part of Master_info change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); - DBUG_ASSERT(thd->net.buff != 0); - net_end(&thd->net); // destructor will not free it, because net.vio is 0 mysql_mutex_lock(&LOCK_thread_count); thd->unlink(); mysql_mutex_unlock(&LOCK_thread_count); THD_CHECK_SENTRY(thd); delete thd; mi->abort_slave= 0; - mi->slave_running= 0; + mi->slave_running= MYSQL_SLAVE_NOT_RUN; mi->io_thd= 0; /* Note: the order of the two following calls (first broadcast, then unlock) @@ -3416,17 +4258,33 @@ err_during_init: /* Check the temporary directory used by commands like LOAD DATA INFILE. + + As the directory never changes during a mysqld run, we only + test this once and cache the result. This also resolve a race condition + when this can be run by multiple threads at the same time. */ + +static bool check_temp_dir_run= 0; +static int check_temp_dir_result= 0; + static int check_temp_dir(char* tmp_file) { - int fd; + File fd; + int result= 1; // Assume failure MY_DIR *dirp; char tmp_dir[FN_REFLEN]; size_t tmp_dir_size; - DBUG_ENTER("check_temp_dir"); + mysql_mutex_lock(&LOCK_thread_count); + if (check_temp_dir_run) + { + result= check_temp_dir_result; + goto end; + } + check_temp_dir_run= 1; + /* Get the directory from the temporary file. */ @@ -3436,27 +4294,122 @@ int check_temp_dir(char* tmp_file) Check if the directory exists. */ if (!(dirp=my_dir(tmp_dir,MYF(MY_WME)))) - DBUG_RETURN(1); + goto end; my_dirend(dirp); /* - Check permissions to create a file. + Check permissions to create a file. We use O_TRUNC to ensure that + things works even if we happen to have and old file laying around. */ if ((fd= mysql_file_create(key_file_misc, tmp_file, CREATE_MODE, - O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, + O_WRONLY | O_BINARY | O_TRUNC | O_NOFOLLOW, MYF(MY_WME))) < 0) - DBUG_RETURN(1); + goto end; + result= 0; // Directory name ok /* Clean up. */ mysql_file_close(fd, MYF(0)); mysql_file_delete(key_file_misc, tmp_file, MYF(0)); - DBUG_RETURN(0); +end: + check_temp_dir_result= result; + mysql_mutex_unlock(&LOCK_thread_count); + DBUG_RETURN(result); +} + + +void +slave_output_error_info(rpl_group_info *rgi, THD *thd) +{ + /* + retrieve as much info as possible from the thd and, error + codes and warnings and print this to the error log as to + allow the user to locate the error + */ + Relay_log_info *rli= rgi->rli; + uint32 const last_errno= rli->last_error().number; + char llbuff[22]; + + if (thd->is_error()) + { + char const *const errmsg= thd->get_stmt_da()->message(); + + DBUG_PRINT("info", + ("thd->get_stmt_da()->sql_errno()=%d; rli->last_error.number=%d", + thd->get_stmt_da()->sql_errno(), last_errno)); + if (last_errno == 0) + { + /* + This function is reporting an error which was not reported + while executing exec_relay_log_event(). + */ + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), + rgi->gtid_info(), "%s", errmsg); + } + else if (last_errno != thd->get_stmt_da()->sql_errno()) + { + /* + * An error was reported while executing exec_relay_log_event() + * however the error code differs from what is in the thread. + * This function prints out more information to help finding + * what caused the problem. + */ + sql_print_error("Slave (additional info): %s Error_code: %d", + errmsg, thd->get_stmt_da()->sql_errno()); + } + } + + /* Print any warnings issued */ + Diagnostics_area::Sql_condition_iterator it= + thd->get_stmt_da()->sql_conditions(); + const Sql_condition *err; + /* + Added controlled slave thread cancel for replication + of user-defined variables. + */ + bool udf_error = false; + while ((err= it++)) + { + if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY) + udf_error = true; + sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); + } + if (udf_error) + { + String tmp; + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_error("Error loading user-defined library, slave SQL " + "thread aborted. Install the missing library, and restart the " + "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " + "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, + llbuff), tmp.c_ptr_safe()); + } + else + { + String tmp; + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_error("\ +Error running query, slave SQL thread aborted. Fix the problem, and restart \ +the slave SQL thread with \"SLAVE START\". We stopped at log \ +'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff), + tmp.c_ptr_safe()); + } } + /** Slave SQL thread entry point. @@ -3473,9 +4426,13 @@ pthread_handler_t handle_slave_sql(void *arg) char saved_master_log_name[FN_REFLEN]; my_off_t UNINIT_VAR(saved_log_pos); my_off_t UNINIT_VAR(saved_master_log_pos); + String saved_skip_gtid_pos; my_off_t saved_skip= 0; - Relay_log_info* rli = &((Master_info*)arg)->rli; + Master_info *mi= ((Master_info*)arg); + Relay_log_info* rli = &mi->rli; const char *errmsg; + rpl_group_info *serial_rgi; + rpl_sql_thread_info sql_info(mi->rpl_filter); // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); @@ -3484,10 +4441,13 @@ pthread_handler_t handle_slave_sql(void *arg) LINT_INIT(saved_master_log_pos); LINT_INIT(saved_log_pos); + serial_rgi= new rpl_group_info(rli); thd = new THD; // note that contructor of THD uses DBUG_ ! thd->thread_stack = (char*)&thd; // remember where our stack is + thd->system_thread_info.rpl_sql_info= &sql_info; DBUG_ASSERT(rli->inited); + DBUG_ASSERT(rli->mi == mi); mysql_mutex_lock(&rli->run_lock); DBUG_ASSERT(!rli->slave_running); errmsg= 0; @@ -3495,36 +4455,42 @@ pthread_handler_t handle_slave_sql(void *arg) rli->events_till_abort = abort_slave_event_count; #endif - rli->sql_thd= thd; + /* + THD for the sql driver thd. In parallel replication this is the thread + that reads things from the relay log and calls rpl_parallel::do_event() + to execute queries. + + In single thread replication this is the THD for the thread that is + executing SQL queries too. + */ + serial_rgi->thd= rli->sql_driver_thd= thd; /* Inform waiting threads that slave has started */ rli->slave_run_id++; - rli->slave_running = 1; + rli->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT; pthread_detach_this_thread(); - if (init_slave_thread(thd, SLAVE_THD_SQL)) + if (init_slave_thread(thd, mi, SLAVE_THD_SQL)) { /* TODO: this is currently broken - slave start and change master will be stuck if we fail here */ mysql_cond_broadcast(&rli->start_cond); - rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, "Failed during slave thread initialization"); goto err_during_init; } thd->init_for_queries(); - thd->rli_slave= rli; - if ((rli->deferred_events_collecting= rpl_filter->is_on())) + thd->rgi_slave= serial_rgi; + if ((serial_rgi->deferred_events_collecting= mi->rpl_filter->is_on())) { - rli->deferred_events= new Deferred_log_events(rli); + serial_rgi->deferred_events= new Deferred_log_events(rli); } - thd->temporary_tables = rli->save_temporary_tables; // restore temp tables - set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables /* binlog_annotate_row_events must be TRUE only after an Annotate_rows event - has been recieved and only till the last corresponding rbr event has been + has been received and only till the last corresponding rbr event has been applied. In all other cases it must be FALSE. */ thd->variables.binlog_annotate_row_events= 0; @@ -3540,6 +4506,7 @@ pthread_handler_t handle_slave_sql(void *arg) Seconds_Behind_Master grows. No big deal. */ rli->abort_slave = 0; + rli->stop_for_until= false; mysql_mutex_unlock(&rli->run_lock); mysql_cond_broadcast(&rli->start_cond); @@ -3554,24 +4521,40 @@ pthread_handler_t handle_slave_sql(void *arg) But the master timestamp is reset by RESET SLAVE & CHANGE MASTER. */ rli->clear_error(); + rli->parallel.reset(); //tell the I/O thread to take relay_log_space_limit into account from now on mysql_mutex_lock(&rli->log_space_lock); rli->ignore_log_space_limit= 0; mysql_mutex_unlock(&rli->log_space_lock); - rli->trans_retries= 0; // start from "no error" - DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries)); + serial_rgi->gtid_sub_id= 0; + serial_rgi->gtid_pending= false; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* + We initialize the relay log state from the know starting position. + It will then be updated as required by GTID and GTID_LIST events found + while applying events read from relay logs. + */ + rli->relay_log_state.load(&rpl_global_gtid_slave_state); + } + rli->gtid_skip_flag = GTID_SKIP_NOT; if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, 1 /*need data lock*/, &errmsg, 1 /*look for a description_event*/)) { - rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, "Error initializing relay log position: %s", errmsg); goto err; } + rli->reset_inuse_relaylog(); + if (rli->alloc_inuse_relaylog(rli->group_relay_log_name)) + goto err; + + strcpy(rli->future_event_master_log_name, rli->group_master_log_name); THD_CHECK_SENTRY(thd); #ifndef DBUG_OFF { @@ -3597,32 +4580,57 @@ pthread_handler_t handle_slave_sql(void *arg) #endif } #endif - DBUG_ASSERT(rli->sql_thd == thd); DBUG_PRINT("master_info",("log_file_name: %s position: %s", rli->group_master_log_name, llstr(rli->group_master_log_pos,llbuff))); if (global_system_variables.log_warnings) + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, + mi->using_gtid==Master_info::USE_GTID_CURRENT_POS); + tmp.append(STRING_WITH_LEN("'")); + } sql_print_information("Slave SQL thread initialized, starting replication in \ -log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, +log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name, - llstr(rli->group_relay_log_pos,llbuff1)); + llstr(rli->group_relay_log_pos,llbuff1), tmp.c_ptr_safe()); + } if (check_temp_dir(rli->slave_patternload_file)) { - rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, "Unable to use slave's temporary directory %s - %s", - slave_load_tmpdir, thd->stmt_da->message()); + slave_load_tmpdir, thd->get_stmt_da()->message()); goto err; } + /* Load the set of seen GTIDs, if we did not already. */ + if (rpl_load_gtid_slave_state(thd)) + { + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, + "Unable to load replication GTID slave state from mysql.%s: %s", + rpl_gtid_slave_state_table_name.str, + thd->get_stmt_da()->message()); + /* + If we are using old-style replication, we can continue, even though we + then will not be able to record the GTIDs we receive. But if using GTID, + we must give up. + */ + if (mi->using_gtid != Master_info::USE_GTID_NO || opt_gtid_strict_mode) + goto err; + } + /* execute init_slave variable */ if (opt_init_slave.length) { execute_init_command(thd, &opt_init_slave, &LOCK_sys_init_slave); if (thd->is_slave_error) { - rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), + rli->report(ERROR_LEVEL, thd->get_stmt_da()->sql_errno(), NULL, "Slave SQL thread aborted. Can't execute init_slave query"); goto err; } @@ -3639,9 +4647,16 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, strmake_buf(saved_master_log_name, rli->group_master_log_name); saved_log_pos= rli->group_relay_log_pos; saved_master_log_pos= rli->group_master_log_pos; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + saved_skip_gtid_pos.append(STRING_WITH_LEN(", GTID '")); + rpl_append_gtid_state(&saved_skip_gtid_pos, false); + saved_skip_gtid_pos.append(STRING_WITH_LEN("'; ")); + } saved_skip= rli->slave_skip_counter; } - if (rli->until_condition != Relay_log_info::UNTIL_NONE && + if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS || + rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) && rli->is_until_satisfied(thd, NULL)) { char buf[22]; @@ -3654,112 +4669,83 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, /* Read queries from the IO/THREAD until this thread is killed */ - while (!sql_slave_killed(thd,rli)) + while (!sql_slave_killed(serial_rgi)) { - thd_proc_info(thd, "Reading event from the relay log"); - DBUG_ASSERT(rli->sql_thd == thd); + THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log); THD_CHECK_SENTRY(thd); if (saved_skip && rli->slave_skip_counter == 0) { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN(", GTID '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'; ")); + } + sql_print_information("'SQL_SLAVE_SKIP_COUNTER=%ld' executed at " "relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', " - "master_log_pos='%ld' and new position at " + "master_log_pos='%ld'%s and new position at " "relay_log_file='%s', relay_log_pos='%ld', master_log_name='%s', " - "master_log_pos='%ld' ", + "master_log_pos='%ld'%s ", (ulong) saved_skip, saved_log_name, (ulong) saved_log_pos, saved_master_log_name, (ulong) saved_master_log_pos, + saved_skip_gtid_pos.c_ptr_safe(), rli->group_relay_log_name, (ulong) rli->group_relay_log_pos, - rli->group_master_log_name, (ulong) rli->group_master_log_pos); + rli->group_master_log_name, (ulong) rli->group_master_log_pos, + tmp.c_ptr_safe()); saved_skip= 0; + saved_skip_gtid_pos.free(); } - if (exec_relay_log_event(thd,rli)) + if (exec_relay_log_event(thd, rli, serial_rgi)) { DBUG_PRINT("info", ("exec_relay_log_event() failed")); // do not scare the user if SQL thread was simply killed or stopped - if (!sql_slave_killed(thd,rli)) - { - /* - retrieve as much info as possible from the thd and, error - codes and warnings and print this to the error log as to - allow the user to locate the error - */ - uint32 const last_errno= rli->last_error().number; - - if (thd->is_error()) - { - char const *const errmsg= thd->stmt_da->message(); - - DBUG_PRINT("info", - ("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d", - thd->stmt_da->sql_errno(), last_errno)); - if (last_errno == 0) - { - /* - This function is reporting an error which was not reported - while executing exec_relay_log_event(). - */ - rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg); - } - else if (last_errno != thd->stmt_da->sql_errno()) - { - /* - * An error was reported while executing exec_relay_log_event() - * however the error code differs from what is in the thread. - * This function prints out more information to help finding - * what caused the problem. - */ - sql_print_error("Slave (additional info): %s Error_code: %d", - errmsg, thd->stmt_da->sql_errno()); - } - } - - /* Print any warnings issued */ - List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list()); - MYSQL_ERROR *err; - /* - Added controlled slave thread cancel for replication - of user-defined variables. - */ - bool udf_error = false; - while ((err= it++)) - { - if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY) - udf_error = true; - sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); - } - if (udf_error) - sql_print_error("Error loading user-defined library, slave SQL " - "thread aborted. Install the missing library, and restart the " - "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " - "position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, - llbuff)); - else - sql_print_error("\ -Error running query, slave SQL thread aborted. Fix the problem, and restart \ -the slave SQL thread with \"SLAVE START\". We stopped at log \ -'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff)); - } + if (!sql_slave_killed(serial_rgi)) + slave_output_error_info(serial_rgi, thd); goto err; } } + if (opt_slave_parallel_threads > 0) + rli->parallel.wait_for_done(thd, rli); + /* Thread stopped. Print the current replication position to the log */ - sql_print_information("Slave SQL thread exiting, replication stopped in log " - "'%s' at position %s", - RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_information("Slave SQL thread exiting, replication stopped in " + "log '%s' at position %s%s", + RPL_LOG_NAME, + llstr(rli->group_master_log_pos,llbuff), + tmp.c_ptr_safe()); + } err: /* + Once again, in case we aborted with an error and skipped the first one. + (We want the first one to be before the printout of stop position to + get the correct position printed.) + */ + if (opt_slave_parallel_threads > 0) + rli->parallel.wait_for_done(thd, rli); + + /* Some events set some playgrounds, which won't be cleared because thread stops. Stopping of this thread may not be known to these events ("stop" request is detected only by the present function, not by events), so we must "proactively" clear playgrounds: */ thd->clear_error(); - rli->cleanup_context(thd, 1); + serial_rgi->cleanup_context(thd, 1); /* Some extra safety, which should not been needed (normally, event deletion should already have done these assignments (each event which sets these @@ -3768,40 +4754,82 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ thd->catalog= 0; thd->reset_query(); thd->reset_db(NULL, 0); + if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + ulong domain_count; + + flush_relay_log_info(rli); + if (opt_slave_parallel_threads > 0) + { + /* + In parallel replication GTID mode, we may stop with different domains + at different positions in the relay log. + + To handle this when we restart the SQL thread, mark the current + per-domain position in the Relay_log_info. + */ + mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); + domain_count= rpl_global_gtid_slave_state.count(); + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + if (domain_count > 1) + { + inuse_relaylog *ir; + + /* + Load the starting GTID position, so that we can skip already applied + GTIDs when we restart the SQL thread. And set the start position in + the relay log back to a known safe place to start (prior to any not + yet applied transaction in any domain). + */ + rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0); + if ((ir= rli->inuse_relaylog_list)) + { + rpl_gtid *gtid= ir->relay_log_state; + uint32 count= ir->relay_log_state_count; + while (count > 0) + { + process_gtid_for_restart_pos(rli, gtid); + ++gtid; + --count; + } + strmake_buf(rli->group_relay_log_name, ir->name); + rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE; + } + } + } + } + THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); thd->add_status_to_global(); - thd_proc_info(thd, "Waiting for slave mutex on exit"); mysql_mutex_lock(&rli->run_lock); err_during_init: /* We need data_lock, at least to wake up any waiting master_pos_wait() */ mysql_mutex_lock(&rli->data_lock); - DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun + DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT); // tracking buffer overrun /* When master_pos_wait() wakes up it will check this and terminate */ - rli->slave_running= 0; + rli->slave_running= MYSQL_SLAVE_NOT_RUN; /* Forget the relay log's format */ delete rli->relay_log.description_event_for_exec; rli->relay_log.description_event_for_exec= 0; + rli->reset_inuse_relaylog(); /* Wake up master_pos_wait() */ mysql_mutex_unlock(&rli->data_lock); DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions")); mysql_cond_broadcast(&rli->data_cond); rli->ignore_log_space_limit= 0; /* don't need any lock */ /* we die so won't remember charset - re-update them on next thread start */ - rli->cached_charset_invalidate(); - rli->save_temporary_tables = thd->temporary_tables; + thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); /* TODO: see if we can do this conditionally in next_event() instead to avoid unneeded position re-init */ thd->temporary_tables = 0; // remove tempation from destructor to close them - DBUG_ASSERT(thd->net.buff != 0); - net_end(&thd->net); // destructor will not free it, because we are weird - DBUG_ASSERT(rli->sql_thd == thd); THD_CHECK_SENTRY(thd); - rli->sql_thd= 0; - set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables + rli->sql_driver_thd= 0; mysql_mutex_lock(&LOCK_thread_count); THD_CHECK_SENTRY(thd); + thd->rgi_fake= thd->rgi_slave= NULL; + delete serial_rgi; delete thd; mysql_mutex_unlock(&LOCK_thread_count); /* @@ -3839,14 +4867,14 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) if (unlikely(!cev->is_valid())) DBUG_RETURN(1); - if (!rpl_filter->db_ok(cev->db)) + if (!mi->rpl_filter->db_ok(cev->db)) { skip_load_data_infile(net); DBUG_RETURN(0); } DBUG_ASSERT(cev->inited_from_old); thd->file_id = cev->file_id = mi->file_id++; - thd->server_id = cev->server_id; + thd->variables.server_id = cev->server_id; cev_not_written = 1; if (unlikely(net_request_file(net,cev->fname))) @@ -3888,7 +4916,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) xev.log_pos = cev->log_pos; if (unlikely(mi->rli.relay_log.append(&xev))) { - mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "error writing Exec_load event to relay log"); goto err; @@ -3902,7 +4930,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) cev->block_len = num_bytes; if (unlikely(mi->rli.relay_log.append(cev))) { - mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "error writing Create_file event to relay log"); goto err; @@ -3917,7 +4945,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) aev.log_pos = cev->log_pos; if (unlikely(mi->rli.relay_log.append(&aev))) { - mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, + mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "error writing Append_block event to relay log"); goto err; @@ -4024,7 +5052,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, { if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME))))) { - mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER(ER_SLAVE_FATAL_ERROR), "Memory allocation failed"); DBUG_RETURN(1); } @@ -4229,6 +5257,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); ulong s_id; bool unlock_data_lock= TRUE; + bool gtid_skip_enqueue= false; + bool got_gtid_event= false; + rpl_gtid event_gtid; + /* FD_q must have been prepared for the first R_a event inside get_master_version_and_clock() @@ -4291,8 +5323,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) goto err; } - LINT_INIT(inc_pos); - if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) DBUG_RETURN(queue_old_event(mi,buf,event_len)); @@ -4320,6 +5350,86 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) event_len - BINLOG_CHECKSUM_LEN : event_len, mi->rli.relay_log.description_event_for_queue); + if (unlikely(mi->gtid_reconnect_event_skip_count) && + unlikely(!mi->gtid_event_seen) && + rev.is_artificial_event() && + (mi->prev_master_id != mi->master_id || + strcmp(rev.new_log_ident, mi->master_log_name) != 0)) + { + /* + Artificial Rotate_log_event is the first event we receive at the start + of each master binlog file. It gives the name of the new binlog file. + + Normally, we already have this name from the real rotate event at the + end of the previous binlog file (unless we are making a new connection + using GTID). But if the master server restarted/crashed, there is no + rotate event at the end of the prior binlog file, so the name is new. + + We use this fact to handle a special case of master crashing. If the + master crashed while writing the binlog, it might end with a partial + event group lacking the COMMIT/XID event, which must be rolled + back. If the slave IO thread happens to get a disconnect in the middle + of exactly this event group, it will try to reconnect at the same GTID + and skip already fetched events. However, that GTID did not commit on + the master before the crash, so it does not really exist, and the + master will connect the slave at the next following GTID starting in + the next binlog. This could confuse the slave and make it mix the + start of one event group with the end of another. + + But we detect this case here, by noticing the change of binlog name + which detects the missing rotate event at the end of the previous + binlog file. In this case, we reset the counters to make us not skip + the next event group, and queue an artificial Format Description + event. The previously fetched incomplete event group will then be + rolled back when the Format Description event is executed by the SQL + thread. + + A similar case is if the reconnect somehow connects to a different + master server (like due to a network proxy or IP address takeover). + We detect this case by noticing a change of server_id and in this + case likewise rollback the partially received event group. + */ + Format_description_log_event fdle(4); + + if (mi->prev_master_id != mi->master_id) + sql_print_warning("The server_id of master server changed in the " + "middle of GTID %u-%u-%llu. Assuming a change of " + "master server, so rolling back the previously " + "received partial transaction. Expected: %lu, " + "received: %lu", mi->last_queued_gtid.domain_id, + mi->last_queued_gtid.server_id, + mi->last_queued_gtid.seq_no, + mi->prev_master_id, mi->master_id); + else if (strcmp(rev.new_log_ident, mi->master_log_name) != 0) + sql_print_warning("Unexpected change of master binlog file name in the " + "middle of GTID %u-%u-%llu, assuming that master has " + "crashed and rolling back the transaction. Expected: " + "'%s', received: '%s'", + mi->last_queued_gtid.domain_id, + mi->last_queued_gtid.server_id, + mi->last_queued_gtid.seq_no, + mi->master_log_name, rev.new_log_ident); + + mysql_mutex_lock(log_lock); + if (likely(!fdle.write(rli->relay_log.get_log_file()) && + !rli->relay_log.flush_and_sync(NULL))) + { + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + else + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + mysql_mutex_unlock(log_lock); + goto err; + } + rli->relay_log.signal_update(); + mysql_mutex_unlock(log_lock); + + mi->gtid_reconnect_event_skip_count= 0; + mi->events_queued_since_last_gtid= 0; + } + mi->prev_master_id= mi->master_id; + if (unlikely(process_io_rotate(mi, &rev))) { error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; @@ -4416,6 +5526,19 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg; /* + Do not queue any format description event that we receive after a + reconnect where we are skipping over a partial event group received + before the reconnect. + + (If we queued such an event, and it was the first format_description + event after master restart, the slave SQL thread would think that + the partial event group before it in the relay log was from a + previous master crash and should be rolled back). + */ + if (unlikely(mi->gtid_reconnect_event_skip_count && !mi->gtid_event_seen)) + gtid_skip_enqueue= true; + + /* Though this does some conversion to the slave's format, this will preserve the master's binlog format version, and number of event types. */ @@ -4458,16 +5581,18 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Heartbeat is sent only after an event corresponding to the corrdinates the heartbeat carries. - Slave can not have a difference in coordinates except in the only + Slave can not have a higher coordinate except in the only special case when mi->master_log_name, master_log_pos have never been updated by Rotate event i.e when slave does not have any history with the master (and thereafter mi->master_log_pos is NULL). + Slave can have lower coordinates, if some event from master was omitted. + TODO: handling `when' for SHOW SLAVE STATUS' snds behind */ if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) && mi->master_log_name != NULL) - || mi->master_log_pos != hb.log_pos) + || mi->master_log_pos > hb.log_pos) { /* missed events of heartbeat from the past */ error= ER_SLAVE_HEARTBEAT_FAILURE; @@ -4483,7 +5608,151 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } break; + case GTID_LIST_EVENT: + { + const char *errmsg; + Gtid_list_log_event *glev; + Log_event *tmp; + uint32 flags; + + if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue, + opt_slave_sql_verify_checksum))) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + glev= static_cast<Gtid_list_log_event *>(tmp); + event_pos= glev->log_pos; + flags= glev->gl_flags; + delete glev; + + /* + We use fake Gtid_list events to update the old-style position (among + other things). + + Early code created fake Gtid_list events with zero log_pos, those should + not modify old-style position. + */ + if (event_pos == 0 || event_pos <= mi->master_log_pos) + inc_pos= 0; + else + inc_pos= event_pos - mi->master_log_pos; + + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID && + flags & Gtid_list_log_event::FLAG_UNTIL_REACHED) + { + char str_buf[128]; + String str(str_buf, sizeof(str_buf), system_charset_info); + mi->rli.until_gtid_pos.to_string(&str); + sql_print_information("Slave I/O thread stops because it reached its" + " UNTIL master_gtid_pos %s", str.c_ptr_safe()); + mi->abort_slave= true; + } + } + break; + + case GTID_EVENT: + { + uchar gtid_flag; + + if (Gtid_log_event::peek(buf, event_len, checksum_alg, + &event_gtid.domain_id, &event_gtid.server_id, + &event_gtid.seq_no, >id_flag, + rli->relay_log.description_event_for_queue)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + got_gtid_event= true; + if (mi->using_gtid == Master_info::USE_GTID_NO) + goto default_action; + if (unlikely(!mi->gtid_event_seen)) + { + mi->gtid_event_seen= true; + if (mi->gtid_reconnect_event_skip_count) + { + /* + If we are reconnecting, and we need to skip a partial event group + already queued to the relay log before the reconnect, then we check + that we actually get the same event group (same GTID) as before, so + we do not end up with half of one group and half another. + + The only way we should be able to receive a different GTID than what + we expect is if the binlog on the master (or more likely the whole + master server) was replaced with a different one, on the same IP + address, _and_ the new master happens to have domains in a different + order so we get the GTID from a different domain first. Still, it is + best to protect against this case. + */ + if (event_gtid.domain_id != mi->last_queued_gtid.domain_id || + event_gtid.server_id != mi->last_queued_gtid.server_id || + event_gtid.seq_no != mi->last_queued_gtid.seq_no) + { + bool first; + error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH; + error_msg.append(STRING_WITH_LEN("Expected: ")); + first= true; + rpl_slave_state_tostring_helper(&error_msg, &mi->last_queued_gtid, + &first); + error_msg.append(STRING_WITH_LEN(", received: ")); + first= true; + rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first); + goto err; + } + } + } + + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + goto default_action; + } + + /* + We have successfully queued to relay log everything before this GTID, so + in case of reconnect we can start from after any previous GTID. + (Normally we would have updated gtid_current_pos earlier at the end of + the previous event group, but better leave an extra check here for + safety). + */ + if (mi->events_queued_since_last_gtid) + { + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + } + mi->last_queued_gtid= event_gtid; + mi->last_queued_gtid_standalone= + (gtid_flag & Gtid_log_event::FL_STANDALONE) != 0; + ++mi->events_queued_since_last_gtid; + inc_pos= event_len; + } + break; + +#ifndef DBUG_OFF + case XID_EVENT: + DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000", + { + /* Inject an event group that is missing its XID commit event. */ + if (mi->last_queued_gtid.domain_id == 0 && + mi->last_queued_gtid.seq_no == 1000) + goto skip_relay_logging; + }); + /* Fall through to default case ... */ +#endif + default: + default_action: + if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen) + { + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + --mi->gtid_reconnect_event_skip_count; + gtid_skip_enqueue= true; + } + else if (mi->events_queued_since_last_gtid) + ++mi->events_queued_since_last_gtid; + } + inc_pos= event_len; break; } @@ -4519,7 +5788,51 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_lock(log_lock); s_id= uint4korr(buf + SERVER_ID_OFFSET); - if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) || + /* + Write the event to the relay log, unless we reconnected in the middle + of an event group and now need to skip the initial part of the group that + we already wrote before reconnecting. + */ + if (unlikely(gtid_skip_enqueue)) + { + mi->master_log_pos+= inc_pos; + if ((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT && + s_id == mi->master_id) + { + /* + If we write this master's description event in the middle of an event + group due to GTID reconnect, SQL thread will think that master crashed + in the middle of the group and roll back the first half, so we must not. + + But we still have to write an artificial copy of the masters description + event, to override the initial slave-version description event so that + SQL thread has the right information for parsing the events it reads. + */ + rli->relay_log.description_event_for_queue->created= 0; + rli->relay_log.description_event_for_queue->set_artificial_event(); + if (rli->relay_log.append_no_lock + (rli->relay_log.description_event_for_queue)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + else if (mi->gtid_reconnect_event_skip_count == 0) + { + /* + Add a fake rotate event so that SQL thread can see the old-style + position where we re-connected in the middle of a GTID event group. + */ + Rotate_log_event fake_rev(mi->master_log_name, 0, mi->master_log_pos, 0); + fake_rev.server_id= mi->master_id; + if (rli->relay_log.append_no_lock(&fake_rev)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } + } + else + if ((s_id == global_system_variables.server_id && + !mi->rli.replicate_same_server_id) || /* the following conjunction deals with IGNORE_SERVER_IDS, if set If the master is on the ignore list, execution of @@ -4550,7 +5863,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) IGNORE_SERVER_IDS it increments mi->master_log_pos as well as rli->group_relay_log_pos. */ - if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) || + if (!(s_id == global_system_variables.server_id && + !mi->rli.replicate_same_server_id) || (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) @@ -4559,6 +5873,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); DBUG_ASSERT(rli->ign_master_log_name_end[0]); rli->ign_master_log_pos_end= mi->master_log_pos; + if (got_gtid_event) + rli->ign_gtids.update(&event_gtid); } rli->relay_log.signal_update(); // the slave SQL thread needs to re-check DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", @@ -4566,7 +5882,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } else { - /* write the event to the relay log */ if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) { mi->master_log_pos+= inc_pos; @@ -4578,11 +5893,33 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; } rli->ign_master_log_name_end[0]= 0; // last event is not ignored + if (got_gtid_event) + rli->ign_gtids.remove_if_present(&event_gtid); if (save_buf != NULL) buf= save_buf; } mysql_mutex_unlock(log_lock); + if (!error && + mi->using_gtid != Master_info::USE_GTID_NO && + mi->events_queued_since_last_gtid > 0 && + ( (mi->last_queued_gtid_standalone && + !Log_event::is_part_of_group((Log_event_type)(uchar) + buf[EVENT_TYPE_OFFSET])) || + (!mi->last_queued_gtid_standalone && + ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + Query_log_event::peek_is_commit_rollback(buf, event_len, + checksum_alg)))))) + { + /* + The whole of the current event group is queued. So in case of + reconnect we can start from after the current GTID. + */ + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + } + skip_relay_logging: err: @@ -4590,7 +5927,7 @@ err: mysql_mutex_unlock(&mi->data_lock); DBUG_PRINT("info", ("error: %d", error)); if (error) - mi->report(ERROR_LEVEL, error, ER(error), + mi->report(ERROR_LEVEL, error, NULL, ER(error), (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)? "could not queue event from master" : error_msg.ptr()); @@ -4691,17 +6028,20 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, int last_errno= -2; // impossible error ulong err_count=0; char llbuff[22]; + my_bool my_true= 1; DBUG_ENTER("connect_to_master"); set_slave_max_allowed_packet(thd, mysql); #ifndef DBUG_OFF mi->events_till_disconnect = disconnect_slave_event_count; #endif - ulong client_flag= CLIENT_REMEMBER_OPTIONS; + ulong client_flag= 0; if (opt_slave_compressed_protocol) client_flag=CLIENT_COMPRESS; /* We will use compression */ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char*) &my_true); #ifdef HAVE_OPENSSL if (mi->ssl) @@ -4714,6 +6054,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, mi->ssl_cipher[0]?mi->ssl_cipher:0); mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &mi->ssl_verify_server_cert); + mysql_options(mysql, MYSQL_OPT_SSL_CRLPATH, + mi->ssl_crlpath[0] ? mi->ssl_crlpath : 0); + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &mi->ssl_verify_server_cert); } #endif @@ -4738,20 +6082,20 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); /* Set MYSQL_PLUGIN_DIR in case master asks for an external authentication plugin */ - if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr)
- mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr);
+ if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr) + mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr); /* we disallow empty users */ if (mi->user == NULL || mi->user[0] == 0) { - mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER(ER_SLAVE_FATAL_ERROR), "Invalid (empty) username when attempting to " "connect to the master server. Connection attempt " "terminated."); DBUG_RETURN(1); } - while (!(slave_was_killed = io_slave_killed(thd,mi)) && + while (!(slave_was_killed = io_slave_killed(mi)) && (reconnect ? mysql_reconnect(mysql) != 0 : mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, mi->port, 0, client_flag) == 0)) @@ -4761,7 +6105,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, { last_errno=mysql_errno(mysql); suppress_warnings= 0; - mi->report(ERROR_LEVEL, last_errno, + mi->report(ERROR_LEVEL, last_errno, NULL, "error %s to master '%s@%s:%d'" " - retry-time: %d retries: %lu message: %s", (reconnect ? "reconnecting" : "connecting"), @@ -4829,18 +6173,20 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, } +#ifdef NOT_USED MYSQL *rpl_connect_master(MYSQL *mysql) { - THD *thd= current_thd; Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO); + bool allocated= false; + my_bool my_true= 1; + THD *thd; + if (!mi) { sql_print_error("'rpl_connect_master' must be called in slave I/O thread context."); return NULL; } - - bool allocated= false; - + thd= mi->io_thd; if (!mysql) { if(!(mysql= mysql_init(NULL))) @@ -4860,6 +6206,8 @@ MYSQL *rpl_connect_master(MYSQL *mysql) */ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, + (char*) &my_true); #ifdef HAVE_OPENSSL if (mi->ssl) @@ -4881,11 +6229,11 @@ MYSQL *rpl_connect_master(MYSQL *mysql) if (mi->user == NULL || mi->user[0] == 0 - || io_slave_killed(thd, mi) + || io_slave_killed( mi) || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, mi->port, 0, 0)) { - if (!io_slave_killed(thd, mi)) + if (!io_slave_killed( mi)) sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)", mysql_error(mysql), mysql_errno(mysql)); @@ -4895,6 +6243,7 @@ MYSQL *rpl_connect_master(MYSQL *mysql) } return mysql; } +#endif /* Store the file and position where the execute-slave thread are in the @@ -4985,7 +6334,7 @@ static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg) relay_log_pos Current log pos pending Number of bytes already processed from the event */ - rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE); + rli->event_relay_log_pos= MY_MAX(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE); my_b_seek(cur_log,rli->event_relay_log_pos); DBUG_RETURN(cur_log); } @@ -5000,17 +6349,20 @@ static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg) @return The event read, or NULL on error. If an error occurs, the error is reported through the sql_print_information() or sql_print_error() functions. + + The size of the read event (in bytes) is returned in *event_size. */ -static Log_event* next_event(Relay_log_info* rli) +static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) { Log_event* ev; + Relay_log_info *rli= rgi->rli; IO_CACHE* cur_log = rli->cur_log; mysql_mutex_t *log_lock = rli->relay_log.get_log_lock(); const char* errmsg=0; - THD* thd = rli->sql_thd; DBUG_ENTER("next_event"); - DBUG_ASSERT(thd != 0); + DBUG_ASSERT(rgi->thd != 0 && rgi->thd == rli->sql_driver_thd); + *event_size= 0; #ifndef DBUG_OFF if (abort_slave_event_count && !rli->events_till_abort--) @@ -5026,7 +6378,7 @@ static Log_event* next_event(Relay_log_info* rli) */ mysql_mutex_assert_owner(&rli->data_lock); - while (!sql_slave_killed(thd,rli)) + while (!sql_slave_killed(rgi)) { /* We can have two kinds of log reading: @@ -5039,6 +6391,7 @@ static Log_event* next_event(Relay_log_info* rli) The other case is much simpler: We just have a read only log that nobody else will be updating. */ + ulonglong old_pos; bool hot_log; if ((hot_log = (cur_log != &rli->cache_buf))) { @@ -5074,7 +6427,8 @@ static Log_event* next_event(Relay_log_info* rli) llstr(my_b_tell(cur_log),llbuf1), llstr(rli->event_relay_log_pos,llbuf2))); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); + DBUG_ASSERT(opt_slave_parallel_threads > 0 || + my_b_tell(cur_log) == rli->event_relay_log_pos); } #endif /* @@ -5089,22 +6443,24 @@ static Log_event* next_event(Relay_log_info* rli) But if the relay log is created by new_file(): then the solution is: MYSQL_BIN_LOG::open() will write the buffered description event. */ + old_pos= rli->event_relay_log_pos; if ((ev= Log_event::read_log_event(cur_log,0, rli->relay_log.description_event_for_exec, opt_slave_sql_verify_checksum))) { - DBUG_ASSERT(thd==rli->sql_thd); /* read it while we have a lock, to avoid a mutex lock in inc_event_relay_log_pos() */ rli->future_event_relay_log_pos= my_b_tell(cur_log); + *event_size= rli->future_event_relay_log_pos - old_pos; + if (hot_log) mysql_mutex_unlock(log_lock); + rli->sql_thread_caught_up= false; DBUG_RETURN(ev); } - DBUG_ASSERT(thd==rli->sql_thd); if (opt_reckless_slave) // For mysql-test cur_log->error = 0; if (cur_log->error < 0) @@ -5140,12 +6496,10 @@ static Log_event* next_event(Relay_log_info* rli) Seconds_Behind_Master would be zero only when master has no more updates in binlog for slave. The heartbeat can be sent in a (small) fraction of slave_net_timeout. Until it's done - rli->last_master_timestamp is temporarely (for time of - waiting for the following event) reset whenever EOF is - reached. + rli->sql_thread_caught_up is temporarely (for time of waiting for + the following event) set whenever EOF is reached. */ - time_t save_timestamp= rli->last_master_timestamp; - rli->last_master_timestamp= 0; + rli->sql_thread_caught_up= true; DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count); @@ -5169,6 +6523,36 @@ static Log_event* next_event(Relay_log_info* rli) DBUG_RETURN(ev); } + if (rli->ign_gtids.count()) + { + /* We generate and return a Gtid_list, to update gtid_slave_pos. */ + DBUG_PRINT("info",("seeing ignored end gtids")); + ev= new Gtid_list_log_event(&rli->ign_gtids, + Gtid_list_log_event::FLAG_IGN_GTIDS); + rli->ign_gtids.reset(); + mysql_mutex_unlock(log_lock); + if (unlikely(!ev)) + { + errmsg= "Slave SQL thread failed to create a Gtid_list event " + "(out of memory?), gtid_slave_pos may be inaccurate"; + goto err; + } + ev->server_id= 0; // don't be ignored by slave SQL thread + ev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos + DBUG_RETURN(ev); + } + + /* + We have to check sql_slave_killed() here an extra time. + Otherwise we may miss a wakeup, since last check was done + without holding LOCK_log. + */ + if (sql_slave_killed(rgi)) + { + mysql_mutex_unlock(log_lock); + break; + } + /* We can, and should release data_lock while we are waiting for update. If we do not, show slave status will block @@ -5192,14 +6576,15 @@ static Log_event* next_event(Relay_log_info* rli) and reads one more event and starts honoring log_space_limit again. If the SQL thread needs more events to be able to rotate the log (it - might need to finish the current group first), then it can ask for one - more at a time. Thus we don't outgrow the relay log indefinitely, + might need to finish the current group first), then it can ask for + one more at a time. Thus we don't outgrow the relay log indefinitely, but rather in a controlled manner, until the next rotate. When the SQL thread starts it sets ignore_log_space_limit to false. We should also reset ignore_log_space_limit to 0 when the user does - RESET SLAVE, but in fact, no need as RESET SLAVE requires that the slave - be stopped, and the SQL thread sets ignore_log_space_limit to 0 when + RESET SLAVE, but in fact, no need as RESET SLAVE requires that the + slave be stopped, and the SQL thread sets ignore_log_space_limit + to 0 when it stops. */ mysql_mutex_lock(&rli->log_space_lock); @@ -5237,10 +6622,10 @@ static Log_event* next_event(Relay_log_info* rli) mysql_mutex_unlock(&rli->log_space_lock); mysql_cond_broadcast(&rli->log_space_cond); // Note that wait_for_update_relay_log unlocks lock_log ! - rli->relay_log.wait_for_update_relay_log(rli->sql_thd); + rli->relay_log.wait_for_update_relay_log(rli->sql_driver_thd); // re-acquire data lock since we released it earlier mysql_mutex_lock(&rli->data_lock); - rli->last_master_timestamp= save_timestamp; + rli->sql_thread_caught_up= false; continue; } /* @@ -5252,6 +6637,7 @@ static Log_event* next_event(Relay_log_info* rli) DBUG_ASSERT(rli->cur_log_fd >= 0); mysql_file_close(rli->cur_log_fd, MYF(MY_WME)); rli->cur_log_fd = -1; + rli->last_inuse_relaylog->completed= true; if (relay_log_purge) { @@ -5309,11 +6695,6 @@ static Log_event* next_event(Relay_log_info* rli) mysql_mutex_lock(log_lock); if (rli->relay_log.is_active(rli->linfo.log_file_name)) { -#ifdef EXTRA_DEBUG - if (global_system_variables.log_warnings) - sql_print_information("next log '%s' is currently active", - rli->linfo.log_file_name); -#endif rli->cur_log= cur_log= rli->relay_log.get_log_file(); rli->cur_log_old_open_count= rli->relay_log.get_open_count(); DBUG_ASSERT(rli->cur_log_fd == -1); @@ -5385,6 +6766,12 @@ static Log_event* next_event(Relay_log_info* rli) mysql_mutex_unlock(log_lock); goto err; } + if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name)) + { + if (!hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } if (!hot_log) mysql_mutex_unlock(log_lock); continue; @@ -5396,15 +6783,12 @@ static Log_event* next_event(Relay_log_info* rli) ourselves. We are sure that the log is still not hot now (a log can get from hot to cold, but not from cold to hot). No need for LOCK_log. */ -#ifdef EXTRA_DEBUG - if (global_system_variables.log_warnings) - sql_print_information("next log '%s' is not active", - rli->linfo.log_file_name); -#endif // open_binlog() will check the magic header if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, &errmsg)) <0) goto err; + if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name)) + goto err; } else { @@ -5543,7 +6927,7 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report, " so slave stops; check error log on slave" " for more info", MYF(0), bug_id); // a verbose message for the error log - rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, + rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, NULL, "According to the master's version ('%s')," " it is probable that master suffers from this bug:" " http://bugs.mysql.com/bug.php?id=%u" @@ -5580,20 +6964,14 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report, */ bool rpl_master_erroneous_autoinc(THD *thd) { - if (active_mi && active_mi->rli.sql_thd == thd) + if (thd->rgi_slave) { - Relay_log_info *rli= &active_mi->rli; DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;); - return rpl_master_has_bug(rli, 33029, FALSE, NULL, NULL); + return rpl_master_has_bug(thd->rgi_slave->rli, 33029, FALSE, NULL, NULL); } return FALSE; } -#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION -template class I_List_iterator<i_string>; -template class I_List_iterator<i_string_pair>; -#endif - /** @} (end of group Replication) */ |