summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrei Elkin <aelkin@mysql.com>2009-09-29 14:16:23 +0300
committerAndrei Elkin <aelkin@mysql.com>2009-09-29 14:16:23 +0300
commit5983785ef4b04b865ea1d78c8d452642913a83f3 (patch)
treef79e672485606b6da384195da6a4c1c6729ed362 /sql
parent5d2f79def9b67dd5400411070d47f8180ceb072b (diff)
downloadmariadb-git-5983785ef4b04b865ea1d78c8d452642913a83f3.tar.gz
WL#342 heartbeat
backporting from 6.0 code base to 5.1.
Diffstat (limited to 'sql')
-rw-r--r--sql/lex.h1
-rw-r--r--sql/log.cc53
-rw-r--r--sql/log.h5
-rw-r--r--sql/log_event.cc14
-rw-r--r--sql/log_event.h57
-rw-r--r--sql/mysqld.cc36
-rw-r--r--sql/rpl_mi.cc39
-rw-r--r--sql/rpl_mi.h2
-rw-r--r--sql/slave.cc141
-rw-r--r--sql/slave.h12
-rw-r--r--sql/sql_lex.h5
-rw-r--r--sql/sql_repl.cc190
-rw-r--r--sql/sql_yacc.yy49
13 files changed, 541 insertions, 63 deletions
diff --git a/sql/lex.h b/sql/lex.h
index b199a79350b..790808a8c14 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -323,6 +323,7 @@ static SYMBOL symbols[] = {
{ "MASTER_SSL_KEY", SYM(MASTER_SSL_KEY_SYM)},
{ "MASTER_SSL_VERIFY_SERVER_CERT", SYM(MASTER_SSL_VERIFY_SERVER_CERT_SYM)},
{ "MASTER_USER", SYM(MASTER_USER_SYM)},
+ { "MASTER_HEARTBEAT_PERIOD", SYM(MASTER_HEARTBEAT_PERIOD_SYM)},
{ "MATCH", SYM(MATCH)},
{ "MAX_CONNECTIONS_PER_HOUR", SYM(MAX_CONNECTIONS_PER_HOUR)},
{ "MAX_QUERIES_PER_HOUR", SYM(MAX_QUERIES_PER_HOUR)},
diff --git a/sql/log.cc b/sql/log.cc
index 1af2f3a4ddc..362df871ba9 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -2413,7 +2413,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG()
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE), m_table_map_version(0),
- is_relay_log(0),
+ is_relay_log(0), signal_cnt(0),
description_event_for_exec(0), description_event_for_queue(0)
{
/*
@@ -4605,12 +4605,9 @@ err:
/**
- Wait until we get a signal that the binary log has been updated.
+ Wait until we get a signal that the relay log has been updated.
@param thd Thread variable
- @param is_slave If 0, the caller is the Binlog_dump thread from master;
- if 1, the caller is the SQL thread from the slave. This
- influences only thd->proc_info.
@note
One must have a lock on LOCK_log before calling this function.
@@ -4618,22 +4615,53 @@ err:
THD::enter_cond() (see NOTES in sql_class.h).
*/
-void MYSQL_BIN_LOG::wait_for_update(THD* thd, bool is_slave)
+void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
{
const char *old_msg;
- DBUG_ENTER("wait_for_update");
+ DBUG_ENTER("wait_for_update_relay_log");
old_msg= thd->enter_cond(&update_cond, &LOCK_log,
- is_slave ?
- "Has read all relay log; waiting for the slave I/O "
- "thread to update it" :
- "Has sent all binlog to slave; waiting for binlog "
- "to be updated");
+ "Slave has read all relay log; "
+ "waiting for the slave I/O "
+ "thread to update it" );
pthread_cond_wait(&update_cond, &LOCK_log);
thd->exit_cond(old_msg);
DBUG_VOID_RETURN;
}
+/**
+ Wait until we get a signal that the binary log has been updated.
+ Applies to master only.
+
+ NOTES
+ @param[in] thd a THD struct
+ @param[in] timeout a pointer to a timespec;
+ NULL means to wait w/o timeout.
+ @retval 0 if got signalled on update
+ @retval non-0 if wait timeout elapsed
+ @note
+ LOCK_log must be taken before calling this function.
+ LOCK_log is being released while the thread is waiting.
+ LOCK_log is released by the caller.
+*/
+
+int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
+ const struct timespec *timeout)
+{
+ int ret= 0;
+ const char* old_msg = thd->proc_info;
+ DBUG_ENTER("wait_for_update_bin_log");
+ old_msg= thd->enter_cond(&update_cond, &LOCK_log,
+ "Master has sent all binlog to slave; "
+ "waiting for binlog to be updated");
+ if (!timeout)
+ pthread_cond_wait(&update_cond, &LOCK_log);
+ else
+ ret= pthread_cond_timedwait(&update_cond, &LOCK_log,
+ const_cast<struct timespec *>(timeout));
+ DBUG_RETURN(ret);
+}
+
/**
Close the log file.
@@ -4846,6 +4874,7 @@ bool flush_error_log()
void MYSQL_BIN_LOG::signal_update()
{
DBUG_ENTER("MYSQL_BIN_LOG::signal_update");
+ signal_cnt++;
pthread_cond_broadcast(&update_cond);
DBUG_VOID_RETURN;
}
diff --git a/sql/log.h b/sql/log.h
index d306d6f7182..8d6a90d8a35 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -284,7 +284,7 @@ public:
/* This is relay log */
bool is_relay_log;
-
+ ulong signal_cnt; // update of the counter is checked by heartbeat
/*
These describe the log's format. This is used only for relay logs.
_for_exec is used by the SQL thread, _for_queue by the I/O thread. It's
@@ -339,7 +339,8 @@ public:
}
void set_max_size(ulong max_size_arg);
void signal_update();
- void wait_for_update(THD* thd, bool master_or_slave);
+ void wait_for_update_relay_log(THD* thd);
+ int wait_for_update_bin_log(THD* thd, const struct timespec * timeout);
void set_need_start_event() { need_start_event = 1; }
void init(bool no_auto_events_arg, ulong max_size);
void init_pthread_objects();
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fb6a5230fda..6c240735d23 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3643,6 +3643,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[UPDATE_ROWS_EVENT-1]=
post_header_len[DELETE_ROWS_EVENT-1]= 6;);
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
+ post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
// Sanity-check that all post header lengths are initialized.
IF_DBUG({
@@ -9427,3 +9428,16 @@ st_print_event_info::st_print_event_info()
open_cached_file(&body_cache, NULL, NULL, 0, flags);
}
#endif
+
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
+{
+ uint8 header_size= description_event->common_header_len;
+ ident_len = event_len - header_size;
+ set_if_smaller(ident_len,FN_REFLEN-1);
+ log_ident= buf + header_size;
+}
+#endif
diff --git a/sql/log_event.h b/sql/log_event.h
index 8202dddcc76..b481ae59502 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -250,6 +250,7 @@ struct sql_ex_info
#define EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN (4 + 4 + 4 + 1)
#define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN)
#define INCIDENT_HEADER_LEN 2
+#define HEARTBEAT_HEADER_LEN 0
/*
Max number of possible extra bytes in a replication event compared to a
packet (i.e. a query) sent from client to master;
@@ -575,6 +576,12 @@ enum Log_event_type
INCIDENT_EVENT= 26,
/*
+ Heartbeat event to be send by master at its idle time
+ to ensure master's online status to slave
+ */
+ HEARTBEAT_LOG_EVENT= 27,
+
+ /*
Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers
*/
@@ -689,6 +696,20 @@ typedef struct st_print_event_info
} PRINT_EVENT_INFO;
#endif
+/**
+ the struct aggregates two paramenters that identify an event
+ uniquely in scope of communication of a particular master and slave couple.
+ I.e there can not be 2 events from the same staying connected master which
+ have the same coordinates.
+ @note
+ Such identifier is not yet unique generally as the event originating master
+ is resetable. Also the crashed master can be replaced with some other.
+*/
+struct event_coordinates
+{
+ char * file_name; // binlog file name (directories stripped)
+ my_off_t pos; // event's position in the binlog file
+};
/**
@class Log_event
@@ -3916,6 +3937,42 @@ static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache,
reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE);
}
+#ifndef MYSQL_CLIENT
+/*****************************************************************************
+
+ Heartbeat Log Event class
+
+ Replication event to ensure to slave that master is alive.
+ The event is originated by master's dump thread and sent straight to
+ slave without being logged. Slave itself does not store it in relay log
+ but rather uses a data for immediate checks and throws away the event.
+
+ Two members of the class log_ident and Log_event::log_pos comprise
+ @see the event_coordinates instance. The coordinates that a heartbeat
+ instance carries correspond to the last event master has sent from
+ its binlog.
+
+ ****************************************************************************/
+class Heartbeat_log_event: public Log_event
+{
+public:
+ Heartbeat_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* description_event);
+ Log_event_type get_type_code() { return HEARTBEAT_LOG_EVENT; }
+ bool is_valid() const
+ {
+ return (log_ident != NULL &&
+ log_pos >= BIN_LOG_HEADER_SIZE);
+ }
+ const char * get_log_ident() { return log_ident; }
+ uint get_ident_len() { return ident_len; }
+
+private:
+ const char* log_ident;
+ uint ident_len;
+};
+#endif
+
/**
@} (end of group Replication)
*/
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 7e9eb6e7291..4bbb49f47ff 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -7068,6 +7068,40 @@ static int show_slave_retried_trans(THD *thd, SHOW_VAR *var, char *buff)
pthread_mutex_unlock(&LOCK_active_mi);
return 0;
}
+
+static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff)
+{
+ pthread_mutex_lock(&LOCK_active_mi);
+ if (active_mi)
+ {
+ var->type= SHOW_LONGLONG;
+ var->value= buff;
+ pthread_mutex_lock(&active_mi->rli.data_lock);
+ *((longlong *)buff)= active_mi->received_heartbeats;
+ pthread_mutex_unlock(&active_mi->rli.data_lock);
+ }
+ else
+ var->type= SHOW_UNDEF;
+ pthread_mutex_unlock(&LOCK_active_mi);
+ return 0;
+}
+
+static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff)
+{
+ pthread_mutex_lock(&LOCK_active_mi);
+ if (active_mi)
+ {
+ var->type= SHOW_CHAR;
+ var->value= buff;
+ my_sprintf(buff, (buff, "%.3f",active_mi->heartbeat_period));
+ }
+ else
+ var->type= SHOW_UNDEF;
+ pthread_mutex_unlock(&LOCK_active_mi);
+ return 0;
+}
+
+
#endif /* HAVE_REPLICATION */
static int show_open_tables(THD *thd, SHOW_VAR *var, char *buff)
@@ -7432,6 +7466,8 @@ SHOW_VAR status_vars[]= {
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG},
#ifdef HAVE_REPLICATION
{"Slave_retried_transactions",(char*) &show_slave_retried_trans, SHOW_FUNC},
+ {"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_FUNC},
+ {"Slave_received_heartbeats",(char*) &show_slave_received_heartbeats, SHOW_FUNC},
{"Slave_running", (char*) &show_slave_running, SHOW_FUNC},
#endif
{"Slow_launch_threads", (char*) &slow_launch_threads, SHOW_LONG},
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 5e46837e948..77f7b7e1929 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -26,12 +26,13 @@
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
const char *default_val);
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val);
Master_info::Master_info()
:Slave_reporting_capability("I/O"),
ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0), inited(0),
- abort_slave(0),slave_running(0),
- slave_run_id(0)
+ abort_slave(0),slave_running(0), slave_run_id(0),
+ heartbeat_period(0), received_heartbeats(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
@@ -84,6 +85,17 @@ void init_master_info_with_options(Master_info* mi)
strmake(mi->ssl_key, master_ssl_key, sizeof(mi->ssl_key)-1);
/* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0;
+ /*
+ always request heartbeat unless master_heartbeat_period is set
+ explicitly zero. Here is the default value for heartbeat period
+ if CHANGE MASTER did not specify it. (no data loss in conversion
+ as hb period has a max)
+ */
+ mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
+ (slave_net_timeout/2.0));
+ DBUG_ASSERT(mi->heartbeat_period > (float) 0.001
+ || mi->heartbeat_period == 0);
+
DBUG_VOID_RETURN;
}
@@ -94,8 +106,11 @@ enum {
/* 5.1.16 added value of master_ssl_verify_server_cert */
LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT= 15,
+ /* 6.0 added value of master_heartbeat_period */
+ LINE_FOR_MASTER_HEARTBEAT_PERIOD= 16,
+
/* Number of lines currently used when saving master info file */
- LINES_IN_MASTER_INFO= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT
+ LINES_IN_MASTER_INFO= LINE_FOR_MASTER_HEARTBEAT_PERIOD
};
int init_master_info(Master_info* mi, const char* master_info_fname,
@@ -197,6 +212,7 @@ file '%s')", fname);
mi->fd = fd;
int port, connect_retry, master_log_pos, lines;
int ssl= 0, ssl_verify_server_cert= 0;
+ float master_heartbeat_period= 0.0;
char *first_non_digit;
/*
@@ -281,7 +297,13 @@ file '%s')", fname);
if (lines >= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT &&
init_intvar_from_file(&ssl_verify_server_cert, &mi->file, 0))
goto errwithmsg;
-
+ /*
+ Starting from 6.0 master_heartbeat_period might be
+ in the file
+ */
+ if (lines >= LINE_FOR_MASTER_HEARTBEAT_PERIOD &&
+ init_floatvar_from_file(&master_heartbeat_period, &mi->file, 0.0))
+ goto errwithmsg;
}
#ifndef HAVE_OPENSSL
@@ -300,6 +322,7 @@ file '%s')", fname);
mi->connect_retry= (uint) connect_retry;
mi->ssl= (my_bool) ssl;
mi->ssl_verify_server_cert= ssl_verify_server_cert;
+ mi->heartbeat_period= master_heartbeat_period;
}
DBUG_PRINT("master_info",("log_file_name: %s position: %ld",
mi->master_log_name,
@@ -378,16 +401,18 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
contents of file). But because of number of lines in the first line
of file we don't care about this garbage.
*/
-
+ char heartbeat_buf[sizeof(mi->heartbeat_period) * 4]; // buffer to suffice always
+ my_sprintf(heartbeat_buf, (heartbeat_buf, "%.3f", mi->heartbeat_period));
my_b_seek(file, 0L);
my_b_printf(file,
- "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n",
+ "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
mi->password, mi->port, mi->connect_retry,
(int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
- mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert);
+ mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
+ heartbeat_buf);
DBUG_RETURN(-flush_io_cache(file));
}
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 93fb0a98198..35e18414932 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -83,6 +83,8 @@ class Master_info : public Slave_reporting_capability
Relay_log_info rli;
uint port;
uint connect_retry;
+ float heartbeat_period; // interface with CHANGE MASTER or master.info
+ ulonglong received_heartbeats; // counter of received heartbeat events
#ifndef DBUG_OFF
int events_till_disconnect;
#endif
diff --git a/sql/slave.cc b/sql/slave.cc
index fac9ee214c5..4a161a345eb 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -770,7 +770,6 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
DBUG_RETURN(0);
}
-
/*
skip_load_data_infile()
@@ -860,6 +859,37 @@ int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
DBUG_RETURN(1);
}
+int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
+{
+ char buf[16];
+ DBUG_ENTER("init_floatvar_from_file");
+
+
+ if (my_b_gets(f, buf, sizeof(buf)))
+ {
+ if (sscanf(buf, "%f", var) != 1)
+ DBUG_RETURN(1);
+ else
+ DBUG_RETURN(0);
+ }
+ else if (default_val != 0.0)
+ {
+ *var = default_val;
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(1);
+}
+
+static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
+{
+ if (io_slave_killed(thd, mi))
+ {
+ if (info && global_system_variables.log_warnings)
+ sql_print_information(info);
+ return TRUE;
+ }
+ return FALSE;
+}
/*
Check if the error is caused by network.
@@ -1189,6 +1219,32 @@ when it try to get the value of TIME_ZONE global variable from master.";
}
}
+ if (mi->heartbeat_period != 0.0)
+ {
+ char llbuf[22];
+ const char query_format[]= "SET @master_heartbeat_period= %s";
+ char query[sizeof(query_format) - 2 + sizeof(llbuf)];
+ /*
+ the period is an ulonglong of nano-secs.
+ */
+ llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
+ my_sprintf(query, (query, query_format, llbuf));
+
+ if (mysql_real_query(mysql, query, strlen(query))
+ && !check_io_slave_killed(mi->io_thd, mi, NULL))
+ {
+ errmsg= "The slave I/O thread stops because querying master with '%s' "
+ "failed; error: '%s' ";
+ err_code= ER_SLAVE_FATAL_ERROR;
+ sprintf(err_buff, "%s Error: %s", errmsg,
+ query, mysql_error(mysql));
+ mysql_free_result(mysql_store_result(mysql));
+ goto err;
+ }
+ mysql_free_result(mysql_store_result(mysql));
+ }
+
+
err:
if (errmsg)
{
@@ -2381,18 +2437,6 @@ on this slave.\
}
-static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
-{
- if (io_slave_killed(thd, mi))
- {
- if (info && global_system_variables.log_warnings)
- sql_print_information(info);
- return TRUE;
- }
- return FALSE;
-}
-
-
/**
@brief Try to reconnect slave IO thread.
@@ -3552,6 +3596,7 @@ static int queue_old_event(Master_info *mi, const char *buf,
static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
int error= 0;
+ String error_msg;
ulong inc_pos;
Relay_log_info *rli= &mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
@@ -3586,7 +3631,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
if (unlikely(process_io_rotate(mi,&rev)))
{
- error= 1;
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
/*
@@ -3613,7 +3658,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Log_event::read_log_event(buf, event_len, &errmsg,
mi->rli.relay_log.description_event_for_queue)))
{
- error= 2;
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
delete mi->rli.relay_log.description_event_for_queue;
@@ -3632,6 +3677,56 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
break;
+
+ case HEARTBEAT_LOG_EVENT:
+ {
+ /*
+ HB (heartbeat) cannot come before RL (Relay)
+ */
+ char llbuf[22];
+ Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
+ if (!hb.is_valid())
+ {
+ error= ER_SLAVE_HEARTBEAT_FAILURE;
+ error_msg.append(STRING_WITH_LEN("inconsistent heartbeat event content;"));
+ error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
+ error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
+ error_msg.append(STRING_WITH_LEN(" log_pos "));
+ llstr(hb.log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ mi->received_heartbeats++;
+ /*
+ compare local and event's versions of log_file, log_pos.
+
+ Heartbeat is sent only after an event corresponding to the corrdinates
+ the heartbeat carries.
+ Slave can not have a difference in coordinates except in the only
+ special case when mi->master_log_name, master_log_pos have never
+ been updated by Rotate event i.e when slave does not have any history
+ with the master (and thereafter mi->master_log_pos is NULL).
+
+ TODO: handling `when' for SHOW SLAVE STATUS' snds behind
+ */
+ if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
+ && mi->master_log_name != NULL)
+ || mi->master_log_pos != hb.log_pos)
+ {
+ /* missed events of heartbeat from the past */
+ error= ER_SLAVE_HEARTBEAT_FAILURE;
+ error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
+ error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
+ error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
+ error_msg.append(STRING_WITH_LEN(" log_pos "));
+ llstr(hb.log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ goto skip_relay_logging;
+ }
+ break;
+
default:
inc_pos= event_len;
break;
@@ -3692,15 +3787,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
}
else
- error= 3;
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ }
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
}
pthread_mutex_unlock(log_lock);
-
+skip_relay_logging:
+
err:
pthread_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
+ if (error)
+ mi->report(ERROR_LEVEL, error, ER(error),
+ (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
+ "could not queue event from master" :
+ error_msg.ptr());
DBUG_RETURN(error);
}
@@ -4208,8 +4311,8 @@ static Log_event* next_event(Relay_log_info* rli)
*/
pthread_mutex_unlock(&rli->log_space_lock);
pthread_cond_broadcast(&rli->log_space_cond);
- // Note that wait_for_update unlocks lock_log !
- rli->relay_log.wait_for_update(rli->sql_thd, 1);
+ // Note that wait_for_update_relay_log unlocks lock_log !
+ rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
// re-acquire data lock since we released it earlier
pthread_mutex_lock(&rli->data_lock);
rli->last_master_timestamp= save_timestamp;
diff --git a/sql/slave.h b/sql/slave.h
index a44a7eed83e..e8364090eb4 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -22,6 +22,17 @@
@file
*/
+
+/**
+ Some of defines are need in parser even though replication is not
+ compiled in (embedded).
+*/
+
+/**
+ The maximum is defined as (ULONG_MAX/1000) with 4 bytes ulong
+*/
+#define SLAVE_MAX_HEARTBEAT_PERIOD 4294967
+
#ifdef HAVE_REPLICATION
#include "log.h"
@@ -33,7 +44,6 @@
#define MAX_SLAVE_ERROR 2000
-
// Forward declarations
class Relay_log_info;
class Master_info;
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 6f9f667a75a..f6effab93a4 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -206,14 +206,15 @@ typedef struct st_lex_master_info
{
char *host, *user, *password, *log_file_name;
uint port, connect_retry;
+ float heartbeat_period;
ulonglong pos;
ulong server_id;
/*
Enum is used for making it possible to detect if the user
changed variable or if it should be left at old value
*/
- enum {SSL_UNCHANGED, SSL_DISABLE, SSL_ENABLE}
- ssl, ssl_verify_server_cert;
+ enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
+ ssl, ssl_verify_server_cert, heartbeat_opt;
char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher;
char *relay_log_name;
ulong relay_log_pos;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 0ec8d91214c..cde713b1b40 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -336,6 +336,74 @@ Increase max_allowed_packet on master";
}
+/**
+ An auxiliary function for calling in mysql_binlog_send
+ to initialize the heartbeat timeout in waiting for a binlogged event.
+
+ @param[in] thd THD to access a user variable
+
+ @return heartbeat period an ulonglong of nanoseconds
+ or zero if heartbeat was not demanded by slave
+*/
+static ulonglong get_heartbeat_period(THD * thd)
+{
+ my_bool null_value;
+ LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry? entry->val_int(&null_value) : 0;
+}
+
+/*
+ Function prepares and sends repliation heartbeat event.
+
+ @param net net object of THD
+ @param packet buffer to store the heartbeat instance
+ @param event_coordinates binlog file name and position of the last
+ real event master sent from binlog
+
+ @note
+ Among three essential pieces of heartbeat data Log_event::when
+ is computed locally.
+ The error to send is serious and should force terminating
+ the dump thread.
+*/
+static int send_heartbeat_event(NET* net, String* packet,
+ const struct event_coordinates *coord)
+{
+ DBUG_ENTER("send_heartbeat_event");
+ char header[LOG_EVENT_HEADER_LEN];
+ /*
+ 'when' (the timestamp) is set to 0 so that slave could distinguish between
+ real and fake Rotate events (if necessary)
+ */
+ memset(header, 0, 4); // when
+
+ header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
+
+ char* p= coord->file_name + dirname_length(coord->file_name);
+
+ uint ident_len = strlen(p);
+ ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
+ int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ int2store(header + FLAGS_OFFSET, 0);
+
+ int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
+
+ packet->append(header, sizeof(header));
+ packet->append(p, ident_len); // log_file_name
+
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
+ net_flush(net))
+ {
+ DBUG_RETURN(-1);
+ }
+ packet->set("\0", 1, &my_charset_bin);
+ DBUG_RETURN(0);
+}
+
/*
TODO: Clean up loop to only have one call to send_file()
*/
@@ -361,7 +429,22 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
-
+ /*
+ heartbeat_period from @master_heartbeat_period user variable
+ */
+ ulonglong heartbeat_period= get_heartbeat_period(thd);
+ struct timespec heartbeat_buf;
+ struct event_coordinates coord_buf;
+ struct timespec *heartbeat_ts= NULL;
+ struct event_coordinates *coord= NULL;
+ if (heartbeat_period != LL(0))
+ {
+ heartbeat_ts= &heartbeat_buf;
+ set_timespec_nsec(*heartbeat_ts, 0);
+ coord= &coord_buf;
+ coord->file_name= log_file_name; // initialization basing on what slave remembers
+ coord->pos= pos;
+ }
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
{
@@ -555,6 +638,11 @@ impossible position";
goto err;
}
#endif
+ /*
+ log's filename does not change while it's active
+ */
+ if (coord)
+ coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
@@ -650,26 +738,65 @@ impossible position";
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1;
+ if (coord)
+ coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
break;
case LOG_READ_EOF:
+ {
+ int ret;
+ ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
{
pthread_mutex_unlock(log_lock);
goto end;
}
- if (!thd->killed)
- {
- /* Note that the following call unlocks lock_log */
- mysql_bin_log.wait_for_update(thd, 0);
- }
- else
- pthread_mutex_unlock(log_lock);
- DBUG_PRINT("wait",("binary log received update"));
- break;
- default:
+#ifndef DBUG_OFF
+ ulong hb_info_counter= 0;
+#endif
+ signal_cnt= mysql_bin_log.signal_cnt;
+ do
+ {
+ if (coord)
+ {
+ DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
+ set_timespec_nsec(*heartbeat_ts, heartbeat_period);
+ }
+ ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
+ DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
+ if (ret == ETIMEDOUT || ret == ETIME)
+ {
+#ifndef DBUG_OFF
+ if (hb_info_counter < 3)
+ {
+ sql_print_information("master sends heartbeat message");
+ hb_info_counter++;
+ if (hb_info_counter == 3)
+ sql_print_information("the rest of heartbeat info skipped ...");
+ }
+#endif
+ if (send_heartbeat_event(net, packet, coord))
+ {
+ errmsg = "Failed on my_net_write()";
+ my_errno= ER_UNKNOWN_ERROR;
+ pthread_mutex_unlock(log_lock);
+ goto err;
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(ret == 0 && signal_cnt != mysql_bin_log.signal_cnt ||
+ thd->killed);
+ DBUG_PRINT("wait",("binary log received update"));
+ }
+ } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
+ pthread_mutex_unlock(log_lock);
+ }
+ break;
+
+ default:
pthread_mutex_unlock(log_lock);
fatal_error = 1;
break;
@@ -753,6 +880,8 @@ impossible position";
packet->length(0);
packet->append('\0');
+ if (coord)
+ coord->file_name= log_file_name; // reset to the next
}
}
@@ -1195,13 +1324,18 @@ bool change_master(THD* thd, Master_info* mi)
mi->port = lex_mi->port;
if (lex_mi->connect_retry)
mi->connect_retry = lex_mi->connect_retry;
+ if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->heartbeat_period = lex_mi->heartbeat_period;
+ else
+ mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
+ (slave_net_timeout/2.0));
+ mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
+ if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
- if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED)
- mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE);
-
- if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED)
+ if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl_verify_server_cert=
- (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE);
+ (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
if (lex_mi->ssl_ca)
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
@@ -1745,6 +1879,26 @@ public:
bool update(THD *thd, set_var *var);
};
+static void fix_slave_net_timeout(THD *thd, enum_var_type type)
+{
+ DBUG_ENTER("fix_slave_net_timeout");
+#ifdef HAVE_REPLICATION
+ pthread_mutex_lock(&LOCK_active_mi);
+ DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
+ slave_net_timeout,
+ (active_mi? active_mi->heartbeat_period : 0.0)));
+ if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ "The currect value for master_heartbeat_period"
+ " exceeds the new value of `slave_net_timeout' sec."
+ " A sensible value for the period should be"
+ " less than the timeout.");
+ pthread_mutex_unlock(&LOCK_active_mi);
+#endif
+ DBUG_VOID_RETURN;
+}
+
static sys_var_chain vars = { NULL, NULL };
static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
@@ -1770,7 +1924,8 @@ static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir",
OPT_GLOBAL, SHOW_CHAR_PTR,
(uchar*) &slave_load_tmpdir);
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
- &slave_net_timeout);
+ &slave_net_timeout,
+ fix_slave_net_timeout);
static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors",
OPT_GLOBAL, SHOW_CHAR,
(uchar*) slave_skip_error_names);
@@ -1835,6 +1990,7 @@ int init_replication_sys_vars()
return 0;
}
+
#endif /* HAVE_REPLICATION */
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index a18f57bf9cf..50395d386e8 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -814,6 +814,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token MASTER_SSL_VERIFY_SERVER_CERT_SYM
%token MASTER_SYM
%token MASTER_USER_SYM
+%token MASTER_HEARTBEAT_PERIOD_SYM
%token MATCH /* SQL-2003-R */
%token MAX_CONNECTIONS_PER_HOUR
%token MAX_QUERIES_PER_HOUR
@@ -1592,7 +1593,7 @@ master_def:
| MASTER_SSL_SYM EQ ulong_num
{
Lex->mi.ssl= $3 ?
- LEX_MASTER_INFO::SSL_ENABLE : LEX_MASTER_INFO::SSL_DISABLE;
+ LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE;
}
| MASTER_SSL_CA_SYM EQ TEXT_STRING_sys
{
@@ -1617,9 +1618,51 @@ master_def:
| MASTER_SSL_VERIFY_SERVER_CERT_SYM EQ ulong_num
{
Lex->mi.ssl_verify_server_cert= $3 ?
- LEX_MASTER_INFO::SSL_ENABLE : LEX_MASTER_INFO::SSL_DISABLE;
+ LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE;
}
- | master_file_def
+
+ | MASTER_HEARTBEAT_PERIOD_SYM EQ NUM_literal
+ {
+ Lex->mi.heartbeat_period= (float) $3->val_real();
+ if (Lex->mi.heartbeat_period > SLAVE_MAX_HEARTBEAT_PERIOD ||
+ Lex->mi.heartbeat_period < 0.0)
+ {
+ const char format[]= "%d seconds";
+ char buf[4*sizeof(SLAVE_MAX_HEARTBEAT_PERIOD) + sizeof(format)];
+ my_sprintf(buf, (buf, format, SLAVE_MAX_HEARTBEAT_PERIOD));
+ my_error(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ MYF(0),
+ " is negative or exceeds the maximum ",
+ buf);
+ MYSQL_YYABORT;
+ }
+ if (Lex->mi.heartbeat_period > slave_net_timeout)
+ {
+ push_warning_printf(YYTHD, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ ER(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE),
+ " exceeds the value of `slave_net_timeout' sec.",
+ " A sensible value for the period should be"
+ " less than the timeout.");
+ }
+ if (Lex->mi.heartbeat_period < 0.001)
+ {
+ if (Lex->mi.heartbeat_period != 0.0)
+ {
+ push_warning_printf(YYTHD, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ ER(ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE),
+ " is less than 1 msec.",
+ " The period is reset to zero which means"
+ " no heartbeats will be sending");
+ Lex->mi.heartbeat_period= 0.0;
+ }
+ Lex->mi.heartbeat_opt= LEX_MASTER_INFO::LEX_MI_DISABLE;
+ }
+ Lex->mi.heartbeat_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
+ }
+ |
+ master_file_def
;
master_file_def: