diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 668 |
1 files changed, 581 insertions, 87 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index f6660e5a5c8..8be17860c61 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -40,6 +40,7 @@ #include <errmsg.h> #include <mysqld_error.h> #include <mysys_err.h> +#include "rpl_handler.h" #ifdef HAVE_REPLICATION @@ -48,6 +49,10 @@ #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") #define MAX_SLAVE_RETRY_PAUSE 5 +/* + a parameter of sql_slave_killed() to defer the killed status +*/ +#define SLAVE_WAIT_GROUP_DONE 60 bool use_slave_mask = 0; MY_BITMAP slave_error_mask; char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; @@ -69,6 +74,8 @@ ulonglong relay_log_space_limit = 0; int disconnect_slave_event_count = 0, abort_slave_event_count = 0; int events_till_abort = -1; +static pthread_key(Master_info*, RPL_MASTER_INFO); + enum enum_slave_reconnect_actions { SLAVE_RECON_ACT_REG= 0, @@ -220,6 +227,7 @@ void unlock_slave_threads(Master_info* mi) int init_slave() { DBUG_ENTER("init_slave"); + int error= 0; /* This is called when mysqld starts. Before client connections are @@ -231,7 +239,10 @@ int init_slave() TODO: re-write this to interate through the list of files for multi-master */ - active_mi= new Master_info; + active_mi= new Master_info(relay_log_recovery); + + if (pthread_key_create(&RPL_MASTER_INFO, NULL)) + goto err; /* If --slave-skip-errors=... was not used, the string value for the @@ -250,6 +261,7 @@ int init_slave() if (!active_mi) { sql_print_error("Failed to allocate memory for the master info structure"); + error= 1; goto err; } @@ -257,6 +269,7 @@ int init_slave() !master_host, (SLAVE_IO | SLAVE_SQL))) { sql_print_error("Failed to initialize the master info structure"); + error= 1; goto err; } @@ -275,18 +288,69 @@ int init_slave() SLAVE_IO | SLAVE_SQL)) { sql_print_error("Failed to create slave threads"); + error= 1; goto err; } } - pthread_mutex_unlock(&LOCK_active_mi); - DBUG_RETURN(0); err: pthread_mutex_unlock(&LOCK_active_mi); - DBUG_RETURN(1); + DBUG_RETURN(error); } +/* + Updates the master info based on the information stored in the + relay info and ignores relay logs previously retrieved by the IO + thread, which thus starts fetching again based on to the + group_master_log_pos and group_master_log_name. Eventually, the old + relay logs will be purged by the normal purge mechanism. + + In the feature, we should improve this routine in order to avoid throwing + away logs that are safely stored in the disk. Note also that this recovery + routine relies on the correctness of the relay-log.info and only tolerates + coordinate problems in master.info. + + In this function, there is no need for a mutex as the caller + (i.e. init_slave) already has one acquired. + + Specifically, the following structures are updated: + + 1 - mi->master_log_pos <-- rli->group_master_log_pos + 2 - mi->master_log_name <-- rli->group_master_log_name + 3 - It moves the relay log to the new relay log file, by + rli->group_relay_log_pos <-- BIN_LOG_HEADER_SIZE; + rli->event_relay_log_pos <-- BIN_LOG_HEADER_SIZE; + rli->group_relay_log_name <-- rli->relay_log.get_log_fname(); + rli->event_relay_log_name <-- rli->relay_log.get_log_fname(); + + If there is an error, it returns (1), otherwise returns (0). + */ +int init_recovery(Master_info* mi, const char** errmsg) +{ + DBUG_ENTER("init_recovery"); + + Relay_log_info *rli= &mi->rli; + if (rli->group_master_log_name[0]) + { + mi->master_log_pos= max(BIN_LOG_HEADER_SIZE, + rli->group_master_log_pos); + strmake(mi->master_log_name, rli->group_master_log_name, + sizeof(mi->master_log_name)-1); + + sql_print_warning("Recovery from master pos %ld and file %s.", + (ulong) mi->master_log_pos, mi->master_log_name); + + strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(), + sizeof(rli->group_relay_log_name)-1); + strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(), + sizeof(mi->rli.event_relay_log_name)-1); + + rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; + } + DBUG_RETURN(0); +} + /** Convert slave skip errors bitmap into a printable string. */ @@ -519,7 +583,7 @@ terminate_slave_thread(THD *thd, EINVAL: invalid signal number (can't happen) ESRCH: thread already killed (can happen, should be ignored) */ - IF_DBUG(int err= ) pthread_kill(thd->real_id, thr_client_alarm); + int err __attribute__((unused))= pthread_kill(thd->real_id, thr_client_alarm); DBUG_ASSERT(err != EINVAL); #endif thd->awake(THD::NOT_KILLED); @@ -730,44 +794,92 @@ static bool io_slave_killed(THD* thd, Master_info* mi) DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed); } +/** + The function analyzes a possible killed status and makes + a decision whether to accept it or not. + Normally upon accepting the sql thread goes to shutdown. + In the event of deffering decision @rli->last_event_start_time waiting + timer is set to force the killed status be accepted upon its expiration. + + @param thd pointer to a THD instance + @param rli pointer to Relay_log_info instance + @return TRUE the killed status is recognized, FALSE a possible killed + status is deferred. +*/ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) { + bool ret= FALSE; DBUG_ENTER("sql_slave_killed"); DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun if (abort_loop || thd->killed || rli->abort_slave) { - if (rli->abort_slave && rli->is_in_group() && - thd->transaction.all.modified_non_trans_table) - DBUG_RETURN(0); - /* - If we are in an unsafe situation (stopping could corrupt replication), - we give one minute to the slave SQL thread of grace before really - terminating, in the hope that it will be able to read more events and - the unsafe situation will soon be left. Note that this one minute starts - from the last time anything happened in the slave SQL thread. So it's - really one minute of idleness, we don't timeout if the slave SQL thread - is actively working. - */ - if (rli->last_event_start_time == 0) - DBUG_RETURN(1); - DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving " - "it some grace period")); - if (difftime(time(0), rli->last_event_start_time) > 60) + if (thd->transaction.all.modified_non_trans_table && rli->is_in_group()) { - rli->report(ERROR_LEVEL, 0, - "SQL thread had to stop in an unsafe situation, in " - "the middle of applying updates to a " - "non-transactional table without any primary key. " - "There is a risk of duplicate updates when the slave " - "SQL thread is restarted. Please check your tables' " - "contents after restart."); - DBUG_RETURN(1); + char msg_stopped[]= + "... The slave SQL is stopped, leaving the current group " + "of events unfinished with a non-transaction table changed. " + "If the group consists solely of Row-based events, you can try " + "restarting the slave with --slave-exec-mode=IDEMPOTENT, which " + "ignores duplicate key, key not found, and similar errors (see " + "documentation for details)."; + + if (rli->abort_slave) + { + DBUG_PRINT("info", ("Slave SQL thread is being stopped in the middle of" + " a group having updated a non-trans table, giving" + " it some grace period")); + + /* + Slave sql thread shutdown in face of unfinished group modified + Non-trans table is handled via a timer. The slave may eventually + give out to complete the current group and in that case there + might be issues at consequent slave restart, see the error message. + WL#2975 offers a robust solution requiring to store the last exectuted + event's coordinates along with the group's coordianates + instead of waiting with @c last_event_start_time the timer. + */ + + if (rli->last_event_start_time == 0) + rli->last_event_start_time= my_time(0); + ret= difftime(my_time(0), rli->last_event_start_time) <= + SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE; + + DBUG_EXECUTE_IF("stop_slave_middle_group", + DBUG_EXECUTE_IF("incomplete_group_in_relay_log", + ret= TRUE;);); // time is over + + if (ret == 0) + { + rli->report(WARNING_LEVEL, 0, + "slave SQL thread is being stopped in the middle " + "of applying of a group having updated a non-transaction " + "table; waiting for the group completion ... "); + } + else + { + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), msg_stopped); + } + } + else + { + ret= TRUE; + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR), + msg_stopped); + } + } + else + { + ret= TRUE; } } - DBUG_RETURN(0); + if (ret) + rli->last_event_start_time= 0; + + DBUG_RETURN(ret); } @@ -860,6 +972,126 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) DBUG_RETURN(1); } +int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val) +{ + char buf[16]; + DBUG_ENTER("init_floatvar_from_file"); + + + if (my_b_gets(f, buf, sizeof(buf))) + { + if (sscanf(buf, "%f", var) != 1) + DBUG_RETURN(1); + else + DBUG_RETURN(0); + } + else if (default_val != 0.0) + { + *var = default_val; + DBUG_RETURN(0); + } + DBUG_RETURN(1); +} + + +/** + A master info read method + + This function is called from @c init_master_info() along with + relatives to restore some of @c active_mi members. + Particularly, this function is responsible for restoring + IGNORE_SERVER_IDS list of servers whose events the slave is + going to ignore (to not log them in the relay log). + Items being read are supposed to be decimal output of values of a + type shorter or equal of @c long and separated by the single space. + + @param arr @c DYNAMIC_ARRAY pointer to storage for servers id + @param f @c IO_CACHE pointer to the source file + + @retval 0 All OK + @retval non-zero An error +*/ + +int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f) +{ + int ret= 0; + char buf[16 * (sizeof(long)*4 + 1)]; // static buffer to use most of times + char *buf_act= buf; // actual buffer can be dynamic if static is short + char *token, *last; + uint num_items; // number of items of `arr' + size_t read_size; + DBUG_ENTER("init_dynarray_intvar_from_file"); + + if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0) + { + return 0; // no line in master.info + } + if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n') + { + /* + short read happend; allocate sufficient memory and make the 2nd read + */ + char buf_work[(sizeof(long)*3 + 1)*16]; + memcpy(buf_work, buf, sizeof(buf_work)); + num_items= atoi(strtok_r(buf_work, " ", &last)); + size_t snd_size; + /* + max size lower bound approximate estimation bases on the formula: + (the items number + items themselves) * + (decimal size + space) - 1 + `\n' + '\0' + */ + size_t max_size= (1 + num_items) * (sizeof(long)*3 + 1) + 1; + buf_act= (char*) my_malloc(max_size, MYF(MY_WME)); + memcpy(buf_act, buf, read_size); + snd_size= my_b_gets(f, buf_act + read_size, max_size - read_size); + if (snd_size == 0 || + (snd_size + 1 == max_size - read_size) && buf[max_size - 2] != '\n') + { + /* + failure to make the 2nd read or short read again + */ + ret= 1; + goto err; + } + } + token= strtok_r(buf_act, " ", &last); + if (token == NULL) + { + ret= 1; + goto err; + } + num_items= atoi(token); + for (uint i=0; i < num_items; i++) + { + token= strtok_r(NULL, " ", &last); + if (token == NULL) + { + ret= 1; + goto err; + } + else + { + ulong val= atol(token); + insert_dynamic(arr, (uchar *) &val); + } + } +err: + if (buf_act != buf) + my_free(buf_act, MYF(0)); + DBUG_RETURN(ret); +} + + +static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) +{ + if (io_slave_killed(thd, mi)) + { + if (info && global_system_variables.log_warnings) + sql_print_information("%s", info); + return TRUE; + } + return FALSE; +} /* Check if the error is caused by network. @@ -1028,7 +1260,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) (master_res= mysql_store_result(mysql)) && (master_row= mysql_fetch_row(master_res))) { - if ((::server_id == strtoul(master_row[1], 0, 10)) && + if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) && !mi->rli.replicate_same_server_id) { errmsg= "The slave I/O thread stops because master and slave have equal \ @@ -1066,6 +1298,13 @@ maybe it is a *VERY OLD MASTER*."); mysql_free_result(master_res); master_res= NULL; } + if (mi->master_id == 0 && mi->ignore_server_ids.elements > 0) + { + errmsg= "Slave configured with server id filtering could not detect the master server id."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, ER(err_code), errmsg); + goto err; + } /* Check that the master's global character_set_server and ours are the same. @@ -1189,6 +1428,31 @@ when it try to get the value of TIME_ZONE global variable from master."; } } + if (mi->heartbeat_period != 0.0) + { + char llbuf[22]; + const char query_format[]= "SET @master_heartbeat_period= %s"; + char query[sizeof(query_format) - 2 + sizeof(llbuf)]; + /* + the period is an ulonglong of nano-secs. + */ + llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf); + my_sprintf(query, (query, query_format, llbuf)); + + if (mysql_real_query(mysql, query, strlen(query)) + && !check_io_slave_killed(mi->io_thd, mi, NULL)) + { + errmsg= "The slave I/O thread stops because SET @master_heartbeat_period " + "on master failed."; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto err; + } + mysql_free_result(mysql_store_result(mysql)); + } + + err: if (errmsg) { @@ -1274,7 +1538,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, else { /* Clear the OK result of mysql_rm_table(). */ - thd->main_da.reset_diagnostics_area(); + thd->stmt_da->reset_diagnostics_area(); } } @@ -1298,7 +1562,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, goto err; // mysql_parse took care of the error send thd_proc_info(thd, "Opening master dump table"); - thd->main_da.reset_diagnostics_area(); /* cleanup from CREATE_TABLE */ + thd->stmt_da->reset_diagnostics_area(); /* cleanup from CREATE_TABLE */ /* Note: If this function starts to fail for MERGE tables, change the next two lines to these: @@ -1605,8 +1869,12 @@ bool show_master_info(THD* thd, Master_info* mi) field_list.push_back(new Item_empty_string("Last_IO_Error", 20)); field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Last_SQL_Error", 20)); + field_list.push_back(new Item_empty_string("Replicate_Ignore_Server_Ids", + FN_REFLEN)); + field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong), + MYSQL_TYPE_LONG)); - if (protocol->send_fields(&field_list, + if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); @@ -1640,7 +1908,8 @@ bool show_master_info(THD* thd, Master_info* mi) protocol->store((ulonglong) mi->rli.group_relay_log_pos); protocol->store(mi->rli.group_master_log_name, &my_charset_bin); protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ? - "Yes" : "No", &my_charset_bin); + "Yes" : (mi->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT ? + "Connecting" : "No"), &my_charset_bin); protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin); protocol->store(rpl_filter->get_do_db()); protocol->store(rpl_filter->get_ignore_db()); @@ -1726,6 +1995,32 @@ bool show_master_info(THD* thd, Master_info* mi) protocol->store(mi->rli.last_error().number); // Last_SQL_Error protocol->store(mi->rli.last_error().message, &my_charset_bin); + // Replicate_Ignore_Server_Ids + { + char buff[FN_REFLEN]; + ulong i, cur_len; + for (i= 0, buff[0]= 0, cur_len= 0; + i < mi->ignore_server_ids.elements; i++) + { + ulong s_id, slen; + char sbuff[FN_REFLEN]; + get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i); + slen= my_sprintf(sbuff, (sbuff, (i==0? "%lu" : ", %lu"), s_id)); + if (cur_len + slen + 4 > FN_REFLEN) + { + /* + break the loop whenever remained space could not fit + ellipses on the next cycle + */ + my_sprintf(buff + cur_len, (buff + cur_len, "...")); + break; + } + cur_len += my_sprintf(buff + cur_len, (buff + cur_len, "%s", sbuff)); + } + protocol->store(buff, &my_charset_bin); + } + // Master_Server_id + protocol->store((uint32) mi->master_id); pthread_mutex_unlock(&mi->rli.err_lock); pthread_mutex_unlock(&mi->err_lock); @@ -1869,17 +2164,22 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed, } -static int request_dump(MYSQL* mysql, Master_info* mi, - bool *suppress_warnings) +static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, + bool *suppress_warnings) { uchar buf[FN_REFLEN + 10]; int len; - int binlog_flags = 0; // for now + ushort binlog_flags = 0; // for now char* logname = mi->master_log_name; DBUG_ENTER("request_dump"); *suppress_warnings= FALSE; + if (RUN_HOOK(binlog_relay_io, + before_request_transmit, + (thd, mi, binlog_flags))) + DBUG_RETURN(1); + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -2012,7 +2312,7 @@ static int has_temporary_error(THD *thd) DBUG_ENTER("has_temporary_error"); DBUG_EXECUTE_IF("all_errors_are_temporary_errors", - if (thd->main_da.is_error()) + if (thd->stmt_da->is_error()) { thd->clear_error(); my_error(ER_LOCK_DEADLOCK, MYF(0)); @@ -2031,20 +2331,21 @@ static int has_temporary_error(THD *thd) currently, InnoDB deadlock detected by InnoDB or lock wait timeout (innodb_lock_wait_timeout exceeded */ - if (thd->main_da.sql_errno() == ER_LOCK_DEADLOCK || - thd->main_da.sql_errno() == ER_LOCK_WAIT_TIMEOUT) + if (thd->stmt_da->sql_errno() == ER_LOCK_DEADLOCK || + thd->stmt_da->sql_errno() == ER_LOCK_WAIT_TIMEOUT) DBUG_RETURN(1); #ifdef HAVE_NDB_BINLOG /* currently temporary error set in ndbcluster */ - List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list()); MYSQL_ERROR *err; while ((err= it++)) { - DBUG_PRINT("info", ("has warning %d %s", err->code, err->msg)); - switch (err->code) + DBUG_PRINT("info", ("has condition %d %s", err->get_sql_errno(), + err->get_message_text())); + switch (err->get_sql_errno()) { case ER_GET_TEMPORARY_ERRMSG: DBUG_RETURN(1); @@ -2274,6 +2575,27 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) delete ev; DBUG_RETURN(1); } + + { /** + The following failure injecion works in cooperation with tests + setting @@global.debug= 'd,incomplete_group_in_relay_log'. + Xid or Commit events are not executed to force the slave sql + read hanging if the realy log does not have any more events. + */ + DBUG_EXECUTE_IF("incomplete_group_in_relay_log", + if ((ev->get_type_code() == XID_EVENT) || + ((ev->get_type_code() == QUERY_EVENT) && + strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) + { + DBUG_ASSERT(thd->transaction.all.modified_non_trans_table); + rli->abort_slave= 1; + pthread_mutex_unlock(&rli->data_lock); + delete ev; + rli->inc_event_relay_log_pos(); + DBUG_RETURN(0); + };); + } + exec_res= apply_event_and_update_pos(ev, thd, rli); /* @@ -2377,18 +2699,6 @@ on this slave.\ } -static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) -{ - if (io_slave_killed(thd, mi)) - { - if (info && global_system_variables.log_warnings) - sql_print_information("%s", info); - return TRUE; - } - return FALSE; -} - - /** @brief Try to reconnect slave IO thread. @@ -2528,6 +2838,16 @@ pthread_handler_t handle_slave_io(void *arg) mi->master_log_name, llstr(mi->master_log_pos,llbuff))); + /* This must be called before run any binlog_relay_io hooks */ + my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi); + + if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook"); + goto err; + } + if (!(mi->mysql = mysql = mysql_init(NULL))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, @@ -2617,7 +2937,7 @@ connected: while (!io_slave_killed(thd,mi)) { thd_proc_info(thd, "Requesting binlog dump"); - if (request_dump(mysql, mi, &suppress_warnings)) + if (request_dump(thd, mysql, mi, &suppress_warnings)) { sql_print_error("Failed on request_dump()"); if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ @@ -2637,6 +2957,7 @@ requesting master dump") || goto err; goto connected; }); + const char *event_buf; DBUG_ASSERT(mi->last_error().number == 0); while (!io_slave_killed(thd,mi)) @@ -2697,14 +3018,37 @@ Stopping slave I/O thread due to out-of-memory error from master"); retry_count=0; // ok event, reset retry counter thd_proc_info(thd, "Queueing master event to the relay log"); - if (queue_event(mi,(const char*)mysql->net.read_pos + 1, - event_len)) + event_buf= (const char*)mysql->net.read_pos + 1; + if (RUN_HOOK(binlog_relay_io, after_read_event, + (thd, mi,(const char*)mysql->net.read_pos + 1, + event_len, &event_buf, &event_len))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), + "Failed to run 'after_read_event' hook"); + goto err; + } + + /* XXX: 'synced' should be updated by queue_event to indicate + whether event has been synced to disk */ + bool synced= 0; + if (queue_event(mi, event_buf, event_len)) { mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), "could not queue event from master"); goto err; } + + if (RUN_HOOK(binlog_relay_io, after_queue_event, + (thd, mi, event_buf, event_len, synced))) + { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER(ER_SLAVE_FATAL_ERROR), + "Failed to run 'after_queue_event' hook"); + goto err; + } + if (flush_master_info(mi, 1)) { sql_print_error("Failed to flush master info file"); @@ -2750,6 +3094,7 @@ err: // print the current replication position sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); thd->set_query(NULL, 0); thd->reset_db(NULL, 0); if (mysql) @@ -2979,9 +3324,9 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, if (check_temp_dir(rli->slave_patternload_file)) { - rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), + rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "Unable to use slave's temporary directory %s - %s", - slave_load_tmpdir, thd->main_da.message()); + slave_load_tmpdir, thd->stmt_da->message()); goto err; } @@ -2991,7 +3336,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave); if (thd->is_slave_error) { - rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), + rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "Slave SQL thread aborted. Can't execute init_slave query"); goto err; } @@ -3035,20 +3380,20 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, if (thd->is_error()) { - char const *const errmsg= thd->main_da.message(); + char const *const errmsg= thd->stmt_da->message(); DBUG_PRINT("info", - ("thd->main_da.sql_errno()=%d; rli->last_error.number=%d", - thd->main_da.sql_errno(), last_errno)); + ("thd->stmt_da->sql_errno()=%d; rli->last_error.number=%d", + thd->stmt_da->sql_errno(), last_errno)); if (last_errno == 0) { /* This function is reporting an error which was not reported while executing exec_relay_log_event(). */ - rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), "%s", errmsg); + rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), "%s", errmsg); } - else if (last_errno != thd->main_da.sql_errno()) + else if (last_errno != thd->stmt_da->sql_errno()) { /* * An error was reported while executing exec_relay_log_event() @@ -3057,12 +3402,12 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, * what caused the problem. */ sql_print_error("Slave (additional info): %s Error_code: %d", - errmsg, thd->main_da.sql_errno()); + errmsg, thd->stmt_da->sql_errno()); } } /* Print any warnings issued */ - List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list()); MYSQL_ERROR *err; /* Added controlled slave thread cancel for replication @@ -3071,9 +3416,9 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, bool udf_error = false; while ((err= it++)) { - if (err->code == ER_CANT_OPEN_LIBRARY) + if (err->get_sql_errno() == ER_CANT_OPEN_LIBRARY) udf_error = true; - sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code); + sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); } if (udf_error) sql_print_error("Error loading user-defined library, slave SQL " @@ -3555,9 +3900,11 @@ static int queue_old_event(Master_info *mi, const char *buf, static int queue_event(Master_info* mi,const char* buf, ulong event_len) { int error= 0; + String error_msg; ulong inc_pos; Relay_log_info *rli= &mi->rli; pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); + ulong s_id; DBUG_ENTER("queue_event"); LINT_INIT(inc_pos); @@ -3589,7 +3936,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue); if (unlikely(process_io_rotate(mi,&rev))) { - error= 1; + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; goto err; } /* @@ -3616,7 +3963,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Log_event::read_log_event(buf, event_len, &errmsg, mi->rli.relay_log.description_event_for_queue))) { - error= 2; + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; goto err; } delete mi->rli.relay_log.description_event_for_queue; @@ -3635,6 +3982,56 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } break; + + case HEARTBEAT_LOG_EVENT: + { + /* + HB (heartbeat) cannot come before RL (Relay) + */ + char llbuf[22]; + Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue); + if (!hb.is_valid()) + { + error= ER_SLAVE_HEARTBEAT_FAILURE; + error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;")); + error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); + error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident())); + error_msg.append(STRING_WITH_LEN(" log_pos ")); + llstr(hb.log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + mi->received_heartbeats++; + /* + compare local and event's versions of log_file, log_pos. + + Heartbeat is sent only after an event corresponding to the corrdinates + the heartbeat carries. + Slave can not have a difference in coordinates except in the only + special case when mi->master_log_name, master_log_pos have never + been updated by Rotate event i.e when slave does not have any history + with the master (and thereafter mi->master_log_pos is NULL). + + TODO: handling `when' for SHOW SLAVE STATUS' snds behind + */ + if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) + && mi->master_log_name != NULL) + || mi->master_log_pos != hb.log_pos) + { + /* missed events of heartbeat from the past */ + error= ER_SLAVE_HEARTBEAT_FAILURE; + error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;")); + error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); + error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident())); + error_msg.append(STRING_WITH_LEN(" log_pos ")); + llstr(hb.log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + goto skip_relay_logging; + } + break; + default: inc_pos= event_len; break; @@ -3654,9 +4051,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) */ pthread_mutex_lock(log_lock); - - if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) && - !mi->rli.replicate_same_server_id) + s_id= uint4korr(buf + SERVER_ID_OFFSET); + if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) || + /* + the following conjunction deals with IGNORE_SERVER_IDS, if set + If the master is on the ignore list, execution of + format description log events and rotate events is necessary. + */ + (mi->ignore_server_ids.elements > 0 && + mi->shall_ignore_server_id(s_id) && + /* everything is filtered out from non-master */ + (s_id != mi->master_id || + /* for the master meta information is necessary */ + buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) { /* Do not write it to the relay log. @@ -3671,10 +4079,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) But events which were generated by this slave and which do not exist in the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment mi->master_log_pos. + If the event is originated remotely and is being filtered out by + IGNORE_SERVER_IDS it increments mi->master_log_pos + as well as rli->group_relay_log_pos. */ - if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT && - buf[EVENT_TYPE_OFFSET]!=STOP_EVENT) + if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) || + buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && + buf[EVENT_TYPE_OFFSET] != STOP_EVENT) { mi->master_log_pos+= inc_pos; memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); @@ -3682,8 +4094,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rli->ign_master_log_pos_end= mi->master_log_pos; } rli->relay_log.signal_update(); // the slave SQL thread needs to re-check - DBUG_PRINT("info", ("master_log_pos: %lu, event originating from the same server, ignored", - (ulong) mi->master_log_pos)); + DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", + (ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET))); } else { @@ -3695,15 +4107,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rli->relay_log.harvest_bytes_written(&rli->log_space_total); } else - error= 3; + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + } rli->ign_master_log_name_end[0]= 0; // last event is not ignored } pthread_mutex_unlock(log_lock); - +skip_relay_logging: + err: pthread_mutex_unlock(&mi->data_lock); DBUG_PRINT("info", ("error: %d", error)); + if (error) + mi->report(ERROR_LEVEL, error, ER(error), + (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)? + "could not queue event from master" : + error_msg.ptr()); DBUG_RETURN(error); } @@ -3909,6 +4329,71 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, } +MYSQL *rpl_connect_master(MYSQL *mysql) +{ + THD *thd= current_thd; + Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO); + if (!mi) + { + sql_print_error("'rpl_connect_master' must be called in slave I/O thread context."); + return NULL; + } + + bool allocated= false; + + if (!mysql) + { + if(!(mysql= mysql_init(NULL))) + { + sql_print_error("rpl_connect_master: failed in mysql_init()"); + return NULL; + } + allocated= true; + } + + /* + XXX: copied from connect_to_master, this function should not + change the slave status, so we cannot use connect_to_master + directly + + TODO: make this part a seperate function to eliminate duplication + */ + mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); + +#ifdef HAVE_OPENSSL + if (mi->ssl) + { + mysql_ssl_set(mysql, + mi->ssl_key[0]?mi->ssl_key:0, + mi->ssl_cert[0]?mi->ssl_cert:0, + mi->ssl_ca[0]?mi->ssl_ca:0, + mi->ssl_capath[0]?mi->ssl_capath:0, + mi->ssl_cipher[0]?mi->ssl_cipher:0); + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &mi->ssl_verify_server_cert); + } +#endif + + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + /* This one is not strictly needed but we have it here for completeness */ + mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); + + if (io_slave_killed(thd, mi) + || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + if (!io_slave_killed(thd, mi)) + sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)", + mysql_error(mysql), mysql_errno(mysql)); + + if (allocated) + mysql_close(mysql); // this will free the object + return NULL; + } + return mysql; +} + /* Store the file and position where the execute-slave thread are in the relay log. @@ -3962,7 +4447,14 @@ bool flush_relay_log_info(Relay_log_info* rli) error=1; if (flush_io_cache(file)) error=1; - + if (sync_relayloginfo_period && + !error && + ++(rli->sync_counter) >= sync_relayloginfo_period) + { + if (my_sync(rli->info_fd, MYF(MY_WME))) + error=1; + rli->sync_counter= 0; + } /* Flushing the relay log is done by the slave I/O thread */ DBUG_RETURN(error); } @@ -4211,8 +4703,8 @@ static Log_event* next_event(Relay_log_info* rli) */ pthread_mutex_unlock(&rli->log_space_lock); pthread_cond_broadcast(&rli->log_space_cond); - // Note that wait_for_update unlocks lock_log ! - rli->relay_log.wait_for_update(rli->sql_thd, 1); + // Note that wait_for_update_relay_log unlocks lock_log ! + rli->relay_log.wait_for_update_relay_log(rli->sql_thd); // re-acquire data lock since we released it earlier pthread_mutex_lock(&rli->data_lock); rli->last_master_timestamp= save_timestamp; @@ -4369,6 +4861,8 @@ void rotate_relay_log(Master_info* mi) DBUG_ENTER("rotate_relay_log"); Relay_log_info* rli= &mi->rli; + DBUG_EXECUTE_IF("crash_before_rotate_relaylog", abort();); + /* We don't lock rli->run_lock. This would lead to deadlocks. */ pthread_mutex_lock(&mi->run_lock); |