diff options
author | Andrei Elkin <aelkin@mysql.com> | 2009-09-29 14:16:23 +0300 |
---|---|---|
committer | Andrei Elkin <aelkin@mysql.com> | 2009-09-29 14:16:23 +0300 |
commit | 5983785ef4b04b865ea1d78c8d452642913a83f3 (patch) | |
tree | f79e672485606b6da384195da6a4c1c6729ed362 /sql | |
parent | 5d2f79def9b67dd5400411070d47f8180ceb072b (diff) | |
download | mariadb-git-5983785ef4b04b865ea1d78c8d452642913a83f3.tar.gz |
WL#342 heartbeat
backporting from 6.0 code base to 5.1.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/lex.h | 1 | ||||
-rw-r--r-- | sql/log.cc | 53 | ||||
-rw-r--r-- | sql/log.h | 5 | ||||
-rw-r--r-- | sql/log_event.cc | 14 | ||||
-rw-r--r-- | sql/log_event.h | 57 | ||||
-rw-r--r-- | sql/mysqld.cc | 36 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 39 | ||||
-rw-r--r-- | sql/rpl_mi.h | 2 | ||||
-rw-r--r-- | sql/slave.cc | 141 | ||||
-rw-r--r-- | sql/slave.h | 12 | ||||
-rw-r--r-- | sql/sql_lex.h | 5 | ||||
-rw-r--r-- | sql/sql_repl.cc | 190 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 49 |
13 files changed, 541 insertions, 63 deletions
diff --git a/sql/lex.h b/sql/lex.h index b199a79350b..790808a8c14 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -323,6 +323,7 @@ static SYMBOL symbols[] = { { "MASTER_SSL_KEY", SYM(MASTER_SSL_KEY_SYM)}, { "MASTER_SSL_VERIFY_SERVER_CERT", SYM(MASTER_SSL_VERIFY_SERVER_CERT_SYM)}, { "MASTER_USER", SYM(MASTER_USER_SYM)}, + { "MASTER_HEARTBEAT_PERIOD", SYM(MASTER_HEARTBEAT_PERIOD_SYM)}, { "MATCH", SYM(MATCH)}, { "MAX_CONNECTIONS_PER_HOUR", SYM(MAX_CONNECTIONS_PER_HOUR)}, { "MAX_QUERIES_PER_HOUR", SYM(MAX_QUERIES_PER_HOUR)}, diff --git a/sql/log.cc b/sql/log.cc index 1af2f3a4ddc..362df871ba9 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -2413,7 +2413,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG() :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), need_start_event(TRUE), m_table_map_version(0), - is_relay_log(0), + is_relay_log(0), signal_cnt(0), description_event_for_exec(0), description_event_for_queue(0) { /* @@ -4605,12 +4605,9 @@ err: /** - Wait until we get a signal that the binary log has been updated. + Wait until we get a signal that the relay log has been updated. @param thd Thread variable - @param is_slave If 0, the caller is the Binlog_dump thread from master; - if 1, the caller is the SQL thread from the slave. This - influences only thd->proc_info. @note One must have a lock on LOCK_log before calling this function. @@ -4618,22 +4615,53 @@ err: THD::enter_cond() (see NOTES in sql_class.h). */ -void MYSQL_BIN_LOG::wait_for_update(THD* thd, bool is_slave) +void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd) { const char *old_msg; - DBUG_ENTER("wait_for_update"); + DBUG_ENTER("wait_for_update_relay_log"); old_msg= thd->enter_cond(&update_cond, &LOCK_log, - is_slave ? - "Has read all relay log; waiting for the slave I/O " - "thread to update it" : - "Has sent all binlog to slave; waiting for binlog " - "to be updated"); + "Slave has read all relay log; " + "waiting for the slave I/O " + "thread to update it" ); pthread_cond_wait(&update_cond, &LOCK_log); thd->exit_cond(old_msg); DBUG_VOID_RETURN; } +/** + Wait until we get a signal that the binary log has been updated. + Applies to master only. + + NOTES + @param[in] thd a THD struct + @param[in] timeout a pointer to a timespec; + NULL means to wait w/o timeout. + @retval 0 if got signalled on update + @retval non-0 if wait timeout elapsed + @note + LOCK_log must be taken before calling this function. + LOCK_log is being released while the thread is waiting. + LOCK_log is released by the caller. +*/ + +int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, + const struct timespec *timeout) +{ + int ret= 0; + const char* old_msg = thd->proc_info; + DBUG_ENTER("wait_for_update_bin_log"); + old_msg= thd->enter_cond(&update_cond, &LOCK_log, + "Master has sent all binlog to slave; " + "waiting for binlog to be updated"); + if (!timeout) + pthread_cond_wait(&update_cond, &LOCK_log); + else + ret= pthread_cond_timedwait(&update_cond, &LOCK_log, + const_cast<struct timespec *>(timeout)); + DBUG_RETURN(ret); +} + /** Close the log file. @@ -4846,6 +4874,7 @@ bool flush_error_log() void MYSQL_BIN_LOG::signal_update() { DBUG_ENTER("MYSQL_BIN_LOG::signal_update"); + signal_cnt++; pthread_cond_broadcast(&update_cond); DBUG_VOID_RETURN; } diff --git a/sql/log.h b/sql/log.h index d306d6f7182..8d6a90d8a35 100644 --- a/sql/log.h +++ b/sql/log.h @@ -284,7 +284,7 @@ public: /* This is relay log */ bool is_relay_log; - + ulong signal_cnt; // update of the counter is checked by heartbeat /* These describe the log's format. This is used only for relay logs. _for_exec is used by the SQL thread, _for_queue by the I/O thread. It's @@ -339,7 +339,8 @@ public: } void set_max_size(ulong max_size_arg); void signal_update(); - void wait_for_update(THD* thd, bool master_or_slave); + void wait_for_update_relay_log(THD* thd); + int wait_for_update_bin_log(THD* thd, const struct timespec * timeout); void set_need_start_event() { need_start_event = 1; } void init(bool no_auto_events_arg, ulong max_size); void init_pthread_objects(); diff --git a/sql/log_event.cc b/sql/log_event.cc index fb6a5230fda..6c240735d23 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3643,6 +3643,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[UPDATE_ROWS_EVENT-1]= post_header_len[DELETE_ROWS_EVENT-1]= 6;); post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN; + post_header_len[HEARTBEAT_LOG_EVENT-1]= 0; // Sanity-check that all post header lengths are initialized. IF_DBUG({ @@ -9427,3 +9428,16 @@ st_print_event_info::st_print_event_info() open_cached_file(&body_cache, NULL, NULL, 0, flags); } #endif + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, + const Format_description_log_event* description_event) + :Log_event(buf, description_event) +{ + uint8 header_size= description_event->common_header_len; + ident_len = event_len - header_size; + set_if_smaller(ident_len,FN_REFLEN-1); + log_ident= buf + header_size; +} +#endif diff --git a/sql/log_event.h b/sql/log_event.h index 8202dddcc76..b481ae59502 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -250,6 +250,7 @@ struct sql_ex_info #define EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN (4 + 4 + 4 + 1) #define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN) #define INCIDENT_HEADER_LEN 2 +#define HEARTBEAT_HEADER_LEN 0 /* Max number of possible extra bytes in a replication event compared to a packet (i.e. a query) sent from client to master; @@ -575,6 +576,12 @@ enum Log_event_type INCIDENT_EVENT= 26, /* + Heartbeat event to be send by master at its idle time + to ensure master's online status to slave + */ + HEARTBEAT_LOG_EVENT= 27, + + /* Add new events here - right above this comment! Existing events (except ENUM_END_EVENT) should never change their numbers */ @@ -689,6 +696,20 @@ typedef struct st_print_event_info } PRINT_EVENT_INFO; #endif +/** + the struct aggregates two paramenters that identify an event + uniquely in scope of communication of a particular master and slave couple. + I.e there can not be 2 events from the same staying connected master which + have the same coordinates. + @note + Such identifier is not yet unique generally as the event originating master + is resetable. Also the crashed master can be replaced with some other. +*/ +struct event_coordinates +{ + char * file_name; // binlog file name (directories stripped) + my_off_t pos; // event's position in the binlog file +}; /** @class Log_event @@ -3916,6 +3937,42 @@ static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); } +#ifndef MYSQL_CLIENT +/***************************************************************************** + + Heartbeat Log Event class + + Replication event to ensure to slave that master is alive. + The event is originated by master's dump thread and sent straight to + slave without being logged. Slave itself does not store it in relay log + but rather uses a data for immediate checks and throws away the event. + + Two members of the class log_ident and Log_event::log_pos comprise + @see the event_coordinates instance. The coordinates that a heartbeat + instance carries correspond to the last event master has sent from + its binlog. + + ****************************************************************************/ +class Heartbeat_log_event: public Log_event +{ +public: + Heartbeat_log_event(const char* buf, uint event_len, + const Format_description_log_event* description_event); + Log_event_type get_type_code() { return HEARTBEAT_LOG_EVENT; } + bool is_valid() const + { + return (log_ident != NULL && + log_pos >= BIN_LOG_HEADER_SIZE); + } + const char * get_log_ident() { return log_ident; } + uint get_ident_len() { return ident_len; } + +private: + const char* log_ident; + uint ident_len; +}; +#endif + /** @} (end of group Replication) */ diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 7e9eb6e7291..4bbb49f47ff 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -7068,6 +7068,40 @@ static int show_slave_retried_trans(THD *thd, SHOW_VAR *var, char *buff) pthread_mutex_unlock(&LOCK_active_mi); return 0; } + +static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff) +{ + pthread_mutex_lock(&LOCK_active_mi); + if (active_mi) + { + var->type= SHOW_LONGLONG; + var->value= buff; + pthread_mutex_lock(&active_mi->rli.data_lock); + *((longlong *)buff)= active_mi->received_heartbeats; + pthread_mutex_unlock(&active_mi->rli.data_lock); + } + else + var->type= SHOW_UNDEF; + pthread_mutex_unlock(&LOCK_active_mi); + return 0; +} + +static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff) +{ + pthread_mutex_lock(&LOCK_active_mi); + if (active_mi) + { + var->type= SHOW_CHAR; + var->value= buff; + my_sprintf(buff, (buff, "%.3f",active_mi->heartbeat_period)); + } + else + var->type= SHOW_UNDEF; + pthread_mutex_unlock(&LOCK_active_mi); + return 0; +} + + #endif /* HAVE_REPLICATION */ static int show_open_tables(THD *thd, SHOW_VAR *var, char *buff) @@ -7432,6 +7466,8 @@ SHOW_VAR status_vars[]= { {"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG}, #ifdef HAVE_REPLICATION {"Slave_retried_transactions",(char*) &show_slave_retried_trans, SHOW_FUNC}, + {"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_FUNC}, + {"Slave_received_heartbeats",(char*) &show_slave_received_heartbeats, SHOW_FUNC}, {"Slave_running", (char*) &show_slave_running, SHOW_FUNC}, #endif {"Slow_launch_threads", (char*) &slow_launch_threads, SHOW_LONG}, diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 5e46837e948..77f7b7e1929 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -26,12 +26,13 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, const char *default_val); +int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val); Master_info::Master_info() :Slave_reporting_capability("I/O"), ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0), inited(0), - abort_slave(0),slave_running(0), - slave_run_id(0) + abort_slave(0),slave_running(0), slave_run_id(0), + heartbeat_period(0), received_heartbeats(0) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -84,6 +85,17 @@ void init_master_info_with_options(Master_info* mi) strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1); /* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; + /* + always request heartbeat unless master_heartbeat_period is set + explicitly zero. Here is the default value for heartbeat period + if CHANGE MASTER did not specify it. (no data loss in conversion + as hb period has a max) + */ + mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD, + (slave_net_timeout/2.0)); + DBUG_ASSERT(mi->heartbeat_period > (float) 0.001 + || mi->heartbeat_period == 0); + DBUG_VOID_RETURN; } @@ -94,8 +106,11 @@ enum { /* 5.1.16 added value of master_ssl_verify_server_cert */ LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT= 15, + /* 6.0 added value of master_heartbeat_period */ + LINE_FOR_MASTER_HEARTBEAT_PERIOD= 16, + /* Number of lines currently used when saving master info file */ - LINES_IN_MASTER_INFO= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT + LINES_IN_MASTER_INFO= LINE_FOR_MASTER_HEARTBEAT_PERIOD }; int init_master_info(Master_info* mi, const char* master_info_fname, @@ -197,6 +212,7 @@ file '%s')", fname); mi->fd = fd; int port, connect_retry, master_log_pos, lines; int ssl= 0, ssl_verify_server_cert= 0; + float master_heartbeat_period= 0.0; char *first_non_digit; /* @@ -281,7 +297,13 @@ file '%s')", fname); if (lines >= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT && init_intvar_from_file(&ssl_verify_server_cert, &mi->file, 0)) goto errwithmsg; - + /* + Starting from 6.0 master_heartbeat_period might be + in the file + */ + if (lines >= LINE_FOR_MASTER_HEARTBEAT_PERIOD && + init_floatvar_from_file(&master_heartbeat_period, &mi->file, 0.0)) + goto errwithmsg; } #ifndef HAVE_OPENSSL @@ -300,6 +322,7 @@ file '%s')", fname); mi->connect_retry= (uint) connect_retry; mi->ssl= (my_bool) ssl; mi->ssl_verify_server_cert= ssl_verify_server_cert; + mi->heartbeat_period= master_heartbeat_period; } DBUG_PRINT("master_info",("log_file_name: %s position: %ld", mi->master_log_name, @@ -378,16 +401,18 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache) contents of file). But because of number of lines in the first line of file we don't care about this garbage. */ - + char heartbeat_buf[sizeof(mi->heartbeat_period) * 4]; // buffer to suffice always + my_sprintf(heartbeat_buf, (heartbeat_buf, "%.3f", mi->heartbeat_period)); my_b_seek(file, 0L); my_b_printf(file, - "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n", + "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, mi->password, mi->port, mi->connect_retry, (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert, - mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert); + mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert, + heartbeat_buf); DBUG_RETURN(-flush_io_cache(file)); } diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 93fb0a98198..35e18414932 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -83,6 +83,8 @@ class Master_info : public Slave_reporting_capability Relay_log_info rli; uint port; uint connect_retry; + float heartbeat_period; // interface with CHANGE MASTER or master.info + ulonglong received_heartbeats; // counter of received heartbeat events #ifndef DBUG_OFF int events_till_disconnect; #endif diff --git a/sql/slave.cc b/sql/slave.cc index fac9ee214c5..4a161a345eb 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -770,7 +770,6 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli) DBUG_RETURN(0); } - /* skip_load_data_infile() @@ -860,6 +859,37 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) DBUG_RETURN(1); } +int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val) +{ + char buf[16]; + DBUG_ENTER("init_floatvar_from_file"); + + + if (my_b_gets(f, buf, sizeof(buf))) + { + if (sscanf(buf, "%f", var) != 1) + DBUG_RETURN(1); + else + DBUG_RETURN(0); + } + else if (default_val != 0.0) + { + *var = default_val; + DBUG_RETURN(0); + } + DBUG_RETURN(1); +} + +static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) +{ + if (io_slave_killed(thd, mi)) + { + if (info && global_system_variables.log_warnings) + sql_print_information(info); + return TRUE; + } + return FALSE; +} /* Check if the error is caused by network. @@ -1189,6 +1219,32 @@ when it try to get the value of TIME_ZONE global variable from master."; } } + if (mi->heartbeat_period != 0.0) + { + char llbuf[22]; + const char query_format[]= "SET @master_heartbeat_period= %s"; + char query[sizeof(query_format) - 2 + sizeof(llbuf)]; + /* + the period is an ulonglong of nano-secs. + */ + llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf); + my_sprintf(query, (query, query_format, llbuf)); + + if (mysql_real_query(mysql, query, strlen(query)) + && !check_io_slave_killed(mi->io_thd, mi, NULL)) + { + errmsg= "The slave I/O thread stops because querying master with '%s' " + "failed; error: '%s' "; + err_code= ER_SLAVE_FATAL_ERROR; + sprintf(err_buff, "%s Error: %s", errmsg, + query, mysql_error(mysql)); + mysql_free_result(mysql_store_result(mysql)); + goto err; + } + mysql_free_result(mysql_store_result(mysql)); + } + + err: if (errmsg) { @@ -2381,18 +2437,6 @@ on this slave.\ } -static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info) -{ - if (io_slave_killed(thd, mi)) - { - if (info && global_system_variables.log_warnings) - sql_print_information(info); - return TRUE; - } - return FALSE; -} - - /** @brief Try to reconnect slave IO thread. @@ -3552,6 +3596,7 @@ static int queue_old_event(Master_info *mi, const char *buf, static int queue_event(Master_info* mi,const char* buf, ulong event_len) { int error= 0; + String error_msg; ulong inc_pos; Relay_log_info *rli= &mi->rli; pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); @@ -3586,7 +3631,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue); if (unlikely(process_io_rotate(mi,&rev))) { - error= 1; + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; goto err; } /* @@ -3613,7 +3658,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Log_event::read_log_event(buf, event_len, &errmsg, mi->rli.relay_log.description_event_for_queue))) { - error= 2; + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; goto err; } delete mi->rli.relay_log.description_event_for_queue; @@ -3632,6 +3677,56 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } break; + + case HEARTBEAT_LOG_EVENT: + { + /* + HB (heartbeat) cannot come before RL (Relay) + */ + char llbuf[22]; + Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue); + if (!hb.is_valid()) + { + error= ER_SLAVE_HEARTBEAT_FAILURE; + error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;")); + error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); + error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident())); + error_msg.append(STRING_WITH_LEN(" log_pos ")); + llstr(hb.log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + mi->received_heartbeats++; + /* + compare local and event's versions of log_file, log_pos. + + Heartbeat is sent only after an event corresponding to the corrdinates + the heartbeat carries. + Slave can not have a difference in coordinates except in the only + special case when mi->master_log_name, master_log_pos have never + been updated by Rotate event i.e when slave does not have any history + with the master (and thereafter mi->master_log_pos is NULL). + + TODO: handling `when' for SHOW SLAVE STATUS' snds behind + */ + if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) + && mi->master_log_name != NULL) + || mi->master_log_pos != hb.log_pos) + { + /* missed events of heartbeat from the past */ + error= ER_SLAVE_HEARTBEAT_FAILURE; + error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;")); + error_msg.append(STRING_WITH_LEN("the event's data: log_file_name ")); + error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident())); + error_msg.append(STRING_WITH_LEN(" log_pos ")); + llstr(hb.log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + goto skip_relay_logging; + } + break; + default: inc_pos= event_len; break; @@ -3692,15 +3787,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rli->relay_log.harvest_bytes_written(&rli->log_space_total); } else - error= 3; + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + } rli->ign_master_log_name_end[0]= 0; // last event is not ignored } pthread_mutex_unlock(log_lock); - +skip_relay_logging: + err: pthread_mutex_unlock(&mi->data_lock); DBUG_PRINT("info", ("error: %d", error)); + if (error) + mi->report(ERROR_LEVEL, error, ER(error), + (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)? + "could not queue event from master" : + error_msg.ptr()); DBUG_RETURN(error); } @@ -4208,8 +4311,8 @@ static Log_event* next_event(Relay_log_info* rli) */ pthread_mutex_unlock(&rli->log_space_lock); pthread_cond_broadcast(&rli->log_space_cond); - // Note that wait_for_update unlocks lock_log ! - rli->relay_log.wait_for_update(rli->sql_thd, 1); + // Note that wait_for_update_relay_log unlocks lock_log ! + rli->relay_log.wait_for_update_relay_log(rli->sql_thd); // re-acquire data lock since we released it earlier pthread_mutex_lock(&rli->data_lock); rli->last_master_timestamp= save_timestamp; diff --git a/sql/slave.h b/sql/slave.h index a44a7eed83e..e8364090eb4 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -22,6 +22,17 @@ @file */ + +/** + Some of defines are need in parser even though replication is not + compiled in (embedded). +*/ + +/** + The maximum is defined as (ULONG_MAX/1000) with 4 bytes ulong +*/ +#define SLAVE_MAX_HEARTBEAT_PERIOD 4294967 + #ifdef HAVE_REPLICATION #include "log.h" @@ -33,7 +44,6 @@ #define MAX_SLAVE_ERROR 2000 - // Forward declarations class Relay_log_info; class Master_info; diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 6f9f667a75a..f6effab93a4 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -206,14 +206,15 @@ typedef struct st_lex_master_info { char *host, *user, *password, *log_file_name; uint port, connect_retry; + float heartbeat_period; ulonglong pos; ulong server_id; /* Enum is used for making it possible to detect if the user changed variable or if it should be left at old value */ - enum {SSL_UNCHANGED, SSL_DISABLE, SSL_ENABLE} - ssl, ssl_verify_server_cert; + enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE} + ssl, ssl_verify_server_cert, heartbeat_opt; char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher; char *relay_log_name; ulong relay_log_pos; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 0ec8d91214c..cde713b1b40 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -336,6 +336,74 @@ Increase max_allowed_packet on master"; } +/** + An auxiliary function for calling in mysql_binlog_send + to initialize the heartbeat timeout in waiting for a binlogged event. + + @param[in] thd THD to access a user variable + + @return heartbeat period an ulonglong of nanoseconds + or zero if heartbeat was not demanded by slave +*/ +static ulonglong get_heartbeat_period(THD * thd) +{ + my_bool null_value; + LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")}; + user_var_entry *entry= + (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry? entry->val_int(&null_value) : 0; +} + +/* + Function prepares and sends repliation heartbeat event. + + @param net net object of THD + @param packet buffer to store the heartbeat instance + @param event_coordinates binlog file name and position of the last + real event master sent from binlog + + @note + Among three essential pieces of heartbeat data Log_event::when + is computed locally. + The error to send is serious and should force terminating + the dump thread. +*/ +static int send_heartbeat_event(NET* net, String* packet, + const struct event_coordinates *coord) +{ + DBUG_ENTER("send_heartbeat_event"); + char header[LOG_EVENT_HEADER_LEN]; + /* + 'when' (the timestamp) is set to 0 so that slave could distinguish between + real and fake Rotate events (if necessary) + */ + memset(header, 0, 4); // when + + header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT; + + char* p= coord->file_name + dirname_length(coord->file_name); + + uint ident_len = strlen(p); + ulong event_len = ident_len + LOG_EVENT_HEADER_LEN; + int4store(header + SERVER_ID_OFFSET, server_id); + int4store(header + EVENT_LEN_OFFSET, event_len); + int2store(header + FLAGS_OFFSET, 0); + + int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos + + packet->append(header, sizeof(header)); + packet->append(p, ident_len); // log_file_name + + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) || + net_flush(net)) + { + DBUG_RETURN(-1); + } + packet->set("\0", 1, &my_charset_bin); + DBUG_RETURN(0); +} + /* TODO: Clean up loop to only have one call to send_file() */ @@ -361,7 +429,22 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); - + /* + heartbeat_period from @master_heartbeat_period user variable + */ + ulonglong heartbeat_period= get_heartbeat_period(thd); + struct timespec heartbeat_buf; + struct event_coordinates coord_buf; + struct timespec *heartbeat_ts= NULL; + struct event_coordinates *coord= NULL; + if (heartbeat_period != LL(0)) + { + heartbeat_ts= &heartbeat_buf; + set_timespec_nsec(*heartbeat_ts, 0); + coord= &coord_buf; + coord->file_name= log_file_name; // initialization basing on what slave remembers + coord->pos= pos; + } #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) { @@ -555,6 +638,11 @@ impossible position"; goto err; } #endif + /* + log's filename does not change while it's active + */ + if (coord) + coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET); if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) { @@ -650,26 +738,65 @@ impossible position"; /* we read successfully, so we'll need to send it to the slave */ pthread_mutex_unlock(log_lock); read_packet = 1; + if (coord) + coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET); break; case LOG_READ_EOF: + { + int ret; + ulong signal_cnt; DBUG_PRINT("wait",("waiting for data in binary log")); if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0) { pthread_mutex_unlock(log_lock); goto end; } - if (!thd->killed) - { - /* Note that the following call unlocks lock_log */ - mysql_bin_log.wait_for_update(thd, 0); - } - else - pthread_mutex_unlock(log_lock); - DBUG_PRINT("wait",("binary log received update")); - break; - default: +#ifndef DBUG_OFF + ulong hb_info_counter= 0; +#endif + signal_cnt= mysql_bin_log.signal_cnt; + do + { + if (coord) + { + DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0)); + set_timespec_nsec(*heartbeat_ts, heartbeat_period); + } + ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts); + DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL); + if (ret == ETIMEDOUT || ret == ETIME) + { +#ifndef DBUG_OFF + if (hb_info_counter < 3) + { + sql_print_information("master sends heartbeat message"); + hb_info_counter++; + if (hb_info_counter == 3) + sql_print_information("the rest of heartbeat info skipped ..."); + } +#endif + if (send_heartbeat_event(net, packet, coord)) + { + errmsg = "Failed on my_net_write()"; + my_errno= ER_UNKNOWN_ERROR; + pthread_mutex_unlock(log_lock); + goto err; + } + } + else + { + DBUG_ASSERT(ret == 0 && signal_cnt != mysql_bin_log.signal_cnt || + thd->killed); + DBUG_PRINT("wait",("binary log received update")); + } + } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed); + pthread_mutex_unlock(log_lock); + } + break; + + default: pthread_mutex_unlock(log_lock); fatal_error = 1; break; @@ -753,6 +880,8 @@ impossible position"; packet->length(0); packet->append('\0'); + if (coord) + coord->file_name= log_file_name; // reset to the next } } @@ -1195,13 +1324,18 @@ bool change_master(THD* thd, Master_info* mi) mi->port = lex_mi->port; if (lex_mi->connect_retry) mi->connect_retry = lex_mi->connect_retry; + if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + mi->heartbeat_period = lex_mi->heartbeat_period; + else + mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD, + (slave_net_timeout/2.0)); + mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd + if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE); - if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED) - mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE); - - if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED) + if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED) mi->ssl_verify_server_cert= - (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE); + (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE); if (lex_mi->ssl_ca) strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1); @@ -1745,6 +1879,26 @@ public: bool update(THD *thd, set_var *var); }; +static void fix_slave_net_timeout(THD *thd, enum_var_type type) +{ + DBUG_ENTER("fix_slave_net_timeout"); +#ifdef HAVE_REPLICATION + pthread_mutex_lock(&LOCK_active_mi); + DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f", + slave_net_timeout, + (active_mi? active_mi->heartbeat_period : 0.0))); + if (active_mi && slave_net_timeout < active_mi->heartbeat_period) + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE, + "The currect value for master_heartbeat_period" + " exceeds the new value of `slave_net_timeout' sec." + " A sensible value for the period should be" + " less than the timeout."); + pthread_mutex_unlock(&LOCK_active_mi); +#endif + DBUG_VOID_RETURN; +} + static sys_var_chain vars = { NULL, NULL }; static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates", @@ -1770,7 +1924,8 @@ static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir", OPT_GLOBAL, SHOW_CHAR_PTR, (uchar*) &slave_load_tmpdir); static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout", - &slave_net_timeout); + &slave_net_timeout, + fix_slave_net_timeout); static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors", OPT_GLOBAL, SHOW_CHAR, (uchar*) slave_skip_error_names); @@ -1835,6 +1990,7 @@ int init_replication_sys_vars() return 0; } + #endif /* HAVE_REPLICATION */ diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index a18f57bf9cf..50395d386e8 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -814,6 +814,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token MASTER_SSL_VERIFY_SERVER_CERT_SYM %token MASTER_SYM %token MASTER_USER_SYM +%token MASTER_HEARTBEAT_PERIOD_SYM %token MATCH /* SQL-2003-R */ %token MAX_CONNECTIONS_PER_HOUR %token MAX_QUERIES_PER_HOUR @@ -1592,7 +1593,7 @@ master_def: | MASTER_SSL_SYM EQ ulong_num { Lex->mi.ssl= $3 ? - LEX_MASTER_INFO::SSL_ENABLE : LEX_MASTER_INFO::SSL_DISABLE; + LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE; } | MASTER_SSL_CA_SYM EQ TEXT_STRING_sys { @@ -1617,9 +1618,51 @@ master_def: | MASTER_SSL_VERIFY_SERVER_CERT_SYM EQ ulong_num { Lex->mi.ssl_verify_server_cert= $3 ? - LEX_MASTER_INFO::SSL_ENABLE : LEX_MASTER_INFO::SSL_DISABLE; + LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE; } - | master_file_def + + | MASTER_HEARTBEAT_PERIOD_SYM EQ NUM_literal + { + Lex->mi.heartbeat_period= (float) $3->val_real(); + if (Lex->mi.heartbeat_period > SLAVE_MAX_HEARTBEAT_PERIOD || + Lex->mi.heartbeat_period < 0.0) + { + const char format[]= "%d seconds"; + char buf[4*sizeof(SLAVE_MAX_HEARTBEAT_PERIOD) + sizeof(format)]; + my_sprintf(buf, (buf, format, SLAVE_MAX_HEARTBEAT_PERIOD)); + my_error(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE, + MYF(0), + " is negative or exceeds the maximum ", + buf); + MYSQL_YYABORT; + } + if (Lex->mi.heartbeat_period > slave_net_timeout) + { + push_warning_printf(YYTHD, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE, + ER(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE), + " exceeds the value of `slave_net_timeout' sec.", + " A sensible value for the period should be" + " less than the timeout."); + } + if (Lex->mi.heartbeat_period < 0.001) + { + if (Lex->mi.heartbeat_period != 0.0) + { + push_warning_printf(YYTHD, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE, + ER(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE), + " is less than 1 msec.", + " The period is reset to zero which means" + " no heartbeats will be sending"); + Lex->mi.heartbeat_period= 0.0; + } + Lex->mi.heartbeat_opt= LEX_MASTER_INFO::LEX_MI_DISABLE; + } + Lex->mi.heartbeat_opt= LEX_MASTER_INFO::LEX_MI_ENABLE; + } + | + master_file_def ; master_file_def: |