summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
authorHe Zhenxing <zhenxing.he@sun.com>2009-10-18 11:57:38 +0800
committerHe Zhenxing <zhenxing.he@sun.com>2009-10-18 11:57:38 +0800
commit963051287538dc07c961d8baa18049f19c1977ab (patch)
treea17e4b5853a3e4a0c4dcfcf7083591ba01362f80 /sql/sql_repl.cc
parent66481aebb0300678b0882f0f60b43cb2025a3285 (diff)
parent6ae50d8aedbded8dcdc7504d116f25bcd02535ff (diff)
downloadmariadb-git-963051287538dc07c961d8baa18049f19c1977ab.tar.gz
Manual merge 5.1-rep+2 to 5.1-rep+3
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc483
1 files changed, 399 insertions, 84 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 5c18ff7690c..5c25d789d5c 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -21,6 +21,7 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
+#include "rpl_handler.h"
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -80,6 +81,32 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
DBUG_RETURN(0);
}
+/*
+ Reset thread transmit packet buffer for event sending
+
+ This function allocates header bytes for event transmission, and
+ should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+ ulong *ev_offset, const char **errmsg)
+{
+ int ret= 0;
+ String *packet= &thd->packet;
+
+ /* reserve and set default header */
+ packet->length(0);
+ packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+ {
+ *errmsg= "Failed to run hook 'reserve_header'";
+ my_errno= ER_UNKNOWN_ERROR;
+ ret= 1;
+ }
+ *ev_offset= packet->length();
+ return ret;
+}
+
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -336,6 +363,73 @@ Increase max_allowed_packet on master";
}
+/**
+ An auxiliary function for calling in mysql_binlog_send
+ to initialize the heartbeat timeout in waiting for a binlogged event.
+
+ @param[in] thd THD to access a user variable
+
+ @return heartbeat period an ulonglong of nanoseconds
+ or zero if heartbeat was not demanded by slave
+*/
+static ulonglong get_heartbeat_period(THD * thd)
+{
+ my_bool null_value;
+ LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
+ user_var_entry *entry=
+ (user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry? entry->val_int(&null_value) : 0;
+}
+
+/*
+ Function prepares and sends repliation heartbeat event.
+
+ @param net net object of THD
+ @param packet buffer to store the heartbeat instance
+ @param event_coordinates binlog file name and position of the last
+ real event master sent from binlog
+
+ @note
+ Among three essential pieces of heartbeat data Log_event::when
+ is computed locally.
+ The error to send is serious and should force terminating
+ the dump thread.
+*/
+static int send_heartbeat_event(NET* net, String* packet,
+ const struct event_coordinates *coord)
+{
+ DBUG_ENTER("send_heartbeat_event");
+ char header[LOG_EVENT_HEADER_LEN];
+ /*
+ 'when' (the timestamp) is set to 0 so that slave could distinguish between
+ real and fake Rotate events (if necessary)
+ */
+ memset(header, 0, 4); // when
+
+ header[EVENT_TYPE_OFFSET] = HEARTBEAT_LOG_EVENT;
+
+ char* p= coord->file_name + dirname_length(coord->file_name);
+
+ uint ident_len = strlen(p);
+ ulong event_len = ident_len + LOG_EVENT_HEADER_LEN;
+ int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ int2store(header + FLAGS_OFFSET, 0);
+
+ int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos
+
+ packet->append(header, sizeof(header));
+ packet->append(p, ident_len); // log_file_name
+
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
+ net_flush(net))
+ {
+ DBUG_RETURN(-1);
+ }
+ DBUG_RETURN(0);
+}
+
/*
TODO: Clean up loop to only have one call to send_file()
*/
@@ -346,6 +440,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN], *name;
+
+ ulong ev_offset;
+
IO_CACHE log;
File file = -1;
String* packet = &thd->packet;
@@ -361,6 +458,30 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ /*
+ heartbeat_period from @master_heartbeat_period user variable
+ */
+ ulonglong heartbeat_period= get_heartbeat_period(thd);
+ struct timespec heartbeat_buf;
+ struct event_coordinates coord_buf;
+ struct timespec *heartbeat_ts= NULL;
+ struct event_coordinates *coord= NULL;
+ if (heartbeat_period != LL(0))
+ {
+ heartbeat_ts= &heartbeat_buf;
+ set_timespec_nsec(*heartbeat_ts, 0);
+ coord= &coord_buf;
+ coord->file_name= log_file_name; // initialization basing on what slave remembers
+ coord->pos= pos;
+ }
+ sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+ thd->server_id, log_ident, (ulong)pos);
+ if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ {
+ errmsg= "Failed to run hook 'transmit_start'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
@@ -416,11 +537,9 @@ impossible position";
goto err;
}
- /*
- We need to start a packet with something other than 255
- to distinguish it from error
- */
- packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+ /* reset transmit packet for the fake rotate event below */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
/*
Tell the client about the log name with a fake Rotate event;
@@ -460,7 +579,7 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
- packet->set("\0", 1, &my_charset_bin);
+
/*
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
this larger than the corresponding packet (query) sent
@@ -476,6 +595,11 @@ impossible position";
log_lock = mysql_bin_log.get_log_lock();
if (pos > BIN_LOG_HEADER_SIZE)
{
+ /* reset transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Try to find a Format_description_log_event at the beginning of
the binlog
@@ -483,29 +607,30 @@ impossible position";
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
/*
- The packet has offsets equal to the normal offsets in a binlog
- event +1 (the first character is \0).
+ The packet has offsets equal to the normal offsets in a
+ binlog event + ev_offset (the first ev_offset characters are
+ the header (default \0)).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
- (*packet)[EVENT_TYPE_OFFSET+1]));
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+ if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
- int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+ int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
/*
if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables.
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
- ST_CREATED_OFFSET+1, (ulong) 0);
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -531,8 +656,6 @@ impossible position";
Format_description_log_event will be found naturally if it is written.
*/
}
- /* reset the packet as we wrote to it in any case */
- packet->set("\0", 1, &my_charset_bin);
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
@@ -544,6 +667,12 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
+ Log_event_type event_type= UNKNOWN_EVENT;
+
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
#ifndef DBUG_OFF
@@ -555,16 +684,31 @@ impossible position";
goto err;
}
#endif
+ /*
+ log's filename does not change while it's active
+ */
+ if (coord)
+ coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+ if (event_type == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
- else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+ else if (event_type == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
@@ -572,9 +716,8 @@ impossible position";
goto err;
}
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ DBUG_PRINT("info", ("log event code %d", event_type));
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -583,7 +726,17 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ errmsg= "Failed to run hook 'after_send_event'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
+ /* reset transmit packet for next loop */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
}
/*
@@ -634,6 +787,11 @@ impossible position";
}
#endif
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
No one will update the log while we are reading
now, but we'll be quick and just read one record
@@ -650,34 +808,86 @@ impossible position";
/* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock);
read_packet = 1;
+ if (coord)
+ coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
break;
case LOG_READ_EOF:
+ {
+ int ret;
+ ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
{
pthread_mutex_unlock(log_lock);
goto end;
}
- if (!thd->killed)
- {
- /* Note that the following call unlocks lock_log */
- mysql_bin_log.wait_for_update(thd, 0);
- }
- else
- pthread_mutex_unlock(log_lock);
- DBUG_PRINT("wait",("binary log received update"));
- break;
- default:
+#ifndef DBUG_OFF
+ ulong hb_info_counter= 0;
+#endif
+ signal_cnt= mysql_bin_log.signal_cnt;
+ do
+ {
+ if (coord)
+ {
+ DBUG_ASSERT(heartbeat_ts && heartbeat_period != LL(0));
+ set_timespec_nsec(*heartbeat_ts, heartbeat_period);
+ }
+ ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
+ DBUG_ASSERT(ret == 0 || heartbeat_period != LL(0) && coord != NULL);
+ if (ret == ETIMEDOUT || ret == ETIME)
+ {
+#ifndef DBUG_OFF
+ if (hb_info_counter < 3)
+ {
+ sql_print_information("master sends heartbeat message");
+ hb_info_counter++;
+ if (hb_info_counter == 3)
+ sql_print_information("the rest of heartbeat info skipped ...");
+ }
+#endif
+ /* reset transmit packet for the heartbeat event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+ if (send_heartbeat_event(net, packet, coord))
+ {
+ errmsg = "Failed on my_net_write()";
+ my_errno= ER_UNKNOWN_ERROR;
+ pthread_mutex_unlock(log_lock);
+ goto err;
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(ret == 0 && signal_cnt != mysql_bin_log.signal_cnt ||
+ thd->killed);
+ DBUG_PRINT("wait",("binary log received update"));
+ }
+ } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
+ pthread_mutex_unlock(log_lock);
+ }
+ break;
+
+ default:
pthread_mutex_unlock(log_lock);
fatal_error = 1;
break;
}
if (read_packet)
- {
- thd_proc_info(thd, "Sending binlog event to slave");
+ {
+ thd_proc_info(thd, "Sending binlog event to slave");
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
@@ -685,7 +895,7 @@ impossible position";
goto err;
}
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -694,11 +904,13 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
- /*
- No need to net_flush because we will get to flush later when
- we hit EOF pretty quick
- */
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "Failed to run hook 'after_send_event'";
+ goto err;
+ }
}
if (fatal_error)
@@ -734,6 +946,10 @@ impossible position";
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
+ /* reset transmit packet for the possible fake rotate event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event
@@ -751,8 +967,8 @@ impossible position";
goto err;
}
- packet->length(0);
- packet->append('\0');
+ if (coord)
+ coord->file_name= log_file_name; // reset to the next
}
}
@@ -760,6 +976,7 @@ end:
end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
thd_proc_info(thd, "Waiting to finalize termination");
pthread_mutex_lock(&LOCK_thread_count);
@@ -770,6 +987,7 @@ end:
err:
thd_proc_info(thd, "Waiting to finalize termination");
end_io_cache(&log);
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
@@ -1064,6 +1282,7 @@ int reset_slave(THD *thd, Master_info* mi)
goto err;
}
+ RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
unlock_slave_threads(mi);
if (error)
@@ -1137,26 +1356,40 @@ bool change_master(THD* thd, Master_info* mi)
int thread_mask;
const char* errmsg= 0;
bool need_relay_log_purge= 1;
+ bool ret= FALSE;
DBUG_ENTER("change_master");
lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
+ LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
if (thread_mask) // We refuse if any slave thread is running
{
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
thd_proc_info(thd, "Changing master");
- LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
+ /*
+ We need to check if there is an empty master_host. Otherwise
+ change master succeeds, a master.info file is created containing
+ empty master_host string and when issuing: start slave; an error
+ is thrown stating that the server is not configured as slave.
+ (See BUG#28796).
+ */
+ if(lex_mi->host && !*lex_mi->host)
+ {
+ my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
+ unlock_slave_threads(mi);
+ DBUG_RETURN(TRUE);
+ }
// TODO: see if needs re-write
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
thread_mask))
{
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
/*
@@ -1195,13 +1428,46 @@ bool change_master(THD* thd, Master_info* mi)
mi->port = lex_mi->port;
if (lex_mi->connect_retry)
mi->connect_retry = lex_mi->connect_retry;
+ if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->heartbeat_period = lex_mi->heartbeat_period;
+ else
+ mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
+ (slave_net_timeout/2.0));
+ mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
+ /*
+ reset the last time server_id list if the current CHANGE MASTER
+ is mentioning IGNORE_SERVER_IDS= (...)
+ */
+ if (lex_mi->repl_ignore_server_ids_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
+ reset_dynamic(&mi->ignore_server_ids);
+ for (uint i= 0; i < lex_mi->repl_ignore_server_ids.elements; i++)
+ {
+ ulong s_id;
+ get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
+ if (s_id == ::server_id && replicate_same_server_id)
+ {
+ my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), s_id);
+ ret= TRUE;
+ goto err;
+ }
+ else
+ {
+ if (bsearch((const ulong *) &s_id,
+ mi->ignore_server_ids.buffer,
+ mi->ignore_server_ids.elements, sizeof(ulong),
+ (int (*) (const void*, const void*))
+ change_master_server_id_cmp) == NULL)
+ insert_dynamic(&mi->ignore_server_ids, (uchar*) &s_id);
+ }
+ }
+ sort_dynamic(&mi->ignore_server_ids, (qsort_cmp) change_master_server_id_cmp);
- if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED)
- mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE);
+ if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
- if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED)
+ if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->ssl_verify_server_cert=
- (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE);
+ (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
if (lex_mi->ssl_ca)
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
@@ -1224,9 +1490,11 @@ bool change_master(THD* thd, Master_info* mi)
if (lex_mi->relay_log_name)
{
need_relay_log_purge= 0;
- strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
+ char relay_log_name[FN_REFLEN];
+ mi->rli.relay_log.make_log_name(relay_log_name, lex_mi->relay_log_name);
+ strmake(mi->rli.group_relay_log_name, relay_log_name,
sizeof(mi->rli.group_relay_log_name)-1);
- strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
+ strmake(mi->rli.event_relay_log_name, relay_log_name,
sizeof(mi->rli.event_relay_log_name)-1);
}
@@ -1273,8 +1541,8 @@ bool change_master(THD* thd, Master_info* mi)
if (flush_master_info(mi, 0))
{
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
if (need_relay_log_purge)
{
@@ -1285,8 +1553,8 @@ bool change_master(THD* thd, Master_info* mi)
&errmsg))
{
my_error(ER_RELAY_LOG_FAIL, MYF(0), errmsg);
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
}
else
@@ -1301,8 +1569,8 @@ bool change_master(THD* thd, Master_info* mi)
&msg, 0))
{
my_error(ER_RELAY_LOG_INIT, MYF(0), msg);
- unlock_slave_threads(mi);
- DBUG_RETURN(TRUE);
+ ret= TRUE;
+ goto err;
}
}
/*
@@ -1339,10 +1607,13 @@ bool change_master(THD* thd, Master_info* mi)
pthread_cond_broadcast(&mi->data_cond);
pthread_mutex_unlock(&mi->rli.data_lock);
+err:
unlock_slave_threads(mi);
thd_proc_info(thd, 0);
- my_ok(thd);
- DBUG_RETURN(FALSE);
+ if (ret == FALSE)
+ my_ok(thd);
+ delete_dynamic(&lex_mi->repl_ignore_server_ids); //freeing of parser-time alloc
+ DBUG_RETURN(ret);
}
@@ -1363,7 +1634,11 @@ int reset_master(THD* thd)
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
return 1;
}
- return mysql_bin_log.reset_logs(thd);
+
+ if (mysql_bin_log.reset_logs(thd))
+ return 1;
+ RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+ return 0;
}
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1401,6 +1676,7 @@ bool mysql_show_binlog_events(THD* thd)
bool ret = TRUE;
IO_CACHE log;
File file = -1;
+ MYSQL_BIN_LOG *binary_log= NULL;
DBUG_ENTER("mysql_show_binlog_events");
Log_event::init_show_field_list(&field_list);
@@ -1411,14 +1687,30 @@ bool mysql_show_binlog_events(THD* thd)
Format_description_log_event *description_event= new
Format_description_log_event(3); /* MySQL 4.0 by default */
- /*
- Wait for handlers to insert any pending information
- into the binlog. For e.g. ndb which updates the binlog asynchronously
- this is needed so that the uses sees all its own commands in the binlog
- */
- ha_binlog_wait(thd);
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
+ thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
- if (mysql_bin_log.is_open())
+ /* select wich binary log to use: binlog or relay */
+ if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS )
+ {
+ /*
+ Wait for handlers to insert any pending information
+ into the binlog. For e.g. ndb which updates the binlog asynchronously
+ this is needed so that the uses sees all its own commands in the binlog
+ */
+ ha_binlog_wait(thd);
+
+ binary_log= &mysql_bin_log;
+ }
+ else /* showing relay log contents */
+ {
+ if (!active_mi)
+ DBUG_RETURN(TRUE);
+
+ binary_log= &(active_mi->rli.relay_log);
+ }
+
+ if (binary_log->is_open())
{
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
SELECT_LEX_UNIT *unit= &thd->lex->unit;
@@ -1426,7 +1718,7 @@ bool mysql_show_binlog_events(THD* thd)
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
char search_file_name[FN_REFLEN], *name;
const char *log_file_name = lex_mi->log_file_name;
- pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
+ pthread_mutex_t *log_lock = binary_log->get_log_lock();
LOG_INFO linfo;
Log_event* ev;
@@ -1436,13 +1728,13 @@ bool mysql_show_binlog_events(THD* thd)
name= search_file_name;
if (log_file_name)
- mysql_bin_log.make_log_name(search_file_name, log_file_name);
+ binary_log->make_log_name(search_file_name, log_file_name);
else
name=0; // Find first log
linfo.index_file_offset = 0;
- if (mysql_bin_log.find_log_pos(&linfo, name, 1))
+ if (binary_log->find_log_pos(&linfo, name, 1))
{
errmsg = "Could not find target log";
goto err;
@@ -1745,6 +2037,26 @@ public:
bool update(THD *thd, set_var *var);
};
+static void fix_slave_net_timeout(THD *thd, enum_var_type type)
+{
+ DBUG_ENTER("fix_slave_net_timeout");
+#ifdef HAVE_REPLICATION
+ pthread_mutex_lock(&LOCK_active_mi);
+ DBUG_PRINT("info",("slave_net_timeout=%lu mi->heartbeat_period=%.3f",
+ slave_net_timeout,
+ (active_mi? active_mi->heartbeat_period : 0.0)));
+ if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
+ "The currect value for master_heartbeat_period"
+ " exceeds the new value of `slave_net_timeout' sec."
+ " A sensible value for the period should be"
+ " less than the timeout.");
+ pthread_mutex_unlock(&LOCK_active_mi);
+#endif
+ DBUG_VOID_RETURN;
+}
+
static sys_var_chain vars = { NULL, NULL };
static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
@@ -1761,6 +2073,16 @@ static sys_var_const sys_relay_log_info_file(&vars, "relay_log_info_file",
(uchar*) &relay_log_info_file);
static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
&relay_log_purge);
+static sys_var_bool_ptr sys_relay_log_recovery(&vars, "relay_log_recovery",
+ &relay_log_recovery);
+static sys_var_uint_ptr sys_sync_binlog_period(&vars, "sync_binlog",
+ &sync_binlog_period);
+static sys_var_uint_ptr sys_sync_relaylog_period(&vars, "sync_relay_log",
+ &sync_relaylog_period);
+static sys_var_uint_ptr sys_sync_relayloginfo_period(&vars, "sync_relay_log_info",
+ &sync_relayloginfo_period);
+static sys_var_uint_ptr sys_sync_masterinfo_period(&vars, "sync_master_info",
+ &sync_masterinfo_period);
static sys_var_const sys_relay_log_space_limit(&vars,
"relay_log_space_limit",
OPT_GLOBAL, SHOW_LONGLONG,
@@ -1770,13 +2092,13 @@ static sys_var_const sys_slave_load_tmpdir(&vars, "slave_load_tmpdir",
OPT_GLOBAL, SHOW_CHAR_PTR,
(uchar*) &slave_load_tmpdir);
static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
- &slave_net_timeout);
+ &slave_net_timeout,
+ fix_slave_net_timeout);
static sys_var_const sys_slave_skip_errors(&vars, "slave_skip_errors",
OPT_GLOBAL, SHOW_CHAR,
(uchar*) slave_skip_error_names);
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
&slave_trans_retries);
-static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
@@ -1818,12 +2140,6 @@ bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
}
-bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
-{
- sync_binlog_period= (ulong) var->save_result.ulonglong_value;
- return 0;
-}
-
int init_replication_sys_vars()
{
if (mysql_add_sys_var_chain(vars.first, my_long_options))
@@ -1835,6 +2151,5 @@ int init_replication_sys_vars()
return 0;
}
-#endif /* HAVE_REPLICATION */
-
+#endif /* HAVE_REPLICATION */