summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc274
1 files changed, 214 insertions, 60 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 039dd046086..db8dc694502 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1,5 +1,5 @@
/* Copyright (c) 2000, 2012, Oracle and/or its affiliates.
- Copyright (c) 2008, 2011, Monty Program Ab
+ Copyright (c) 2008, 2012, Monty Program Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -34,14 +34,6 @@ my_bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
-/**
- a copy of active_mi->rli->slave_skip_counter, for showing in SHOW VARIABLES,
- INFORMATION_SCHEMA.GLOBAL_VARIABLES and @@sql_slave_skip_counter without
- taking all the mutexes needed to access active_mi->rli->slave_skip_counter
- properly.
-*/
-uint sql_slave_skip_counter;
-
extern TYPELIB binlog_checksum_typelib;
/*
@@ -492,6 +484,27 @@ static ulonglong get_heartbeat_period(THD * thd)
}
/*
+ Lookup the capabilities of the slave, which it announces by setting a value
+ MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability.
+
+ Older MariaDB slaves, and other MySQL slaves, do not set
+ @mariadb_slave_capability, corresponding to a capability of
+ MARIA_SLAVE_CAPABILITY_UNKNOWN (0).
+*/
+static int
+get_mariadb_slave_capability(THD *thd)
+{
+ bool null_value;
+ const LEX_STRING name= { C_STRING_WITH_LEN("mariadb_slave_capability") };
+ const user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry ?
+ (int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN;
+}
+
+
+/*
Function prepares and sends repliation heartbeat event.
@param net net object of THD
@@ -563,14 +576,68 @@ static int send_heartbeat_event(NET* net, String* packet,
static const char *
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Log_event_type event_type, char *log_file_name,
- IO_CACHE *log)
+ IO_CACHE *log, int mariadb_slave_capability,
+ ulong ev_offset, uint8 current_checksum_alg)
{
my_off_t pos;
/* Do not send annotate_rows events unless slave requested it. */
- if (event_type == ANNOTATE_ROWS_EVENT &&
- !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
- return NULL;
+ if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+ {
+ if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
+ {
+ /* This slave can tolerate events omitted from the binlog stream. */
+ return NULL;
+ }
+ else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE)
+ {
+ /*
+ The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as
+ it will not log them in its own binary log). However, it understands the
+ event and will just ignore it, and it would break if we omitted it,
+ leaving a hole in the binlog stream. So just send the event as-is.
+ */
+ }
+ else
+ {
+ /*
+ The slave does not understand ANNOTATE_ROWS_EVENT.
+
+ Older MariaDB slaves (and MySQL slaves) will break replication if there
+ are holes in the binlog stream (they will miscompute the binlog offset
+ and request the wrong position when reconnecting).
+
+ So replace the event with a dummy event of the same size that will be
+ a no-operation on the slave.
+ */
+ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
+ return "Failed to replace row annotate event with dummy: too small event.";
+ }
+ }
+
+ /*
+ Do not send binlog checkpoint events to a slave that does not understand it.
+ */
+ if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT)
+ {
+ if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
+ {
+ /* This slave can tolerate events omitted from the binlog stream. */
+ return NULL;
+ }
+ else
+ {
+ /*
+ The slave does not understand BINLOG_CHECKPOINT_EVENT. Send a dummy
+ event instead, with same length so slave does not get confused about
+ binlog positions.
+ */
+ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
+ return "Failed to replace binlog checkpoint event with dummy: "
+ "too small event.";
+ }
+ }
/*
Skip events with the @@skip_replication flag set, if slave requested
@@ -628,6 +695,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
NET* net = &thd->net;
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
+ int mariadb_slave_capability;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
@@ -653,7 +721,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
heartbeat_ts= &heartbeat_buf;
set_timespec_nsec(*heartbeat_ts, 0);
}
- sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+ mariadb_slave_capability= get_mariadb_slave_capability(thd);
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
thd->server_id, log_ident, (ulong)pos);
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
{
@@ -765,7 +835,7 @@ impossible position";
this larger than the corresponding packet (query) sent
from client to master.
*/
- thd->variables.max_allowed_packet+= MAX_LOG_EVENT_HEADER;
+ thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
/*
We can set log_lock now, it does not move (it's a member of
@@ -938,7 +1008,9 @@ impossible position";
}
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log)))
+ log_file_name, &log,
+ mariadb_slave_capability, ev_offset,
+ current_checksum_alg)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
@@ -1096,7 +1168,9 @@ impossible position";
if (read_packet &&
(tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log)))
+ log_file_name, &log,
+ mariadb_slave_capability, ev_offset,
+ current_checksum_alg)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
@@ -1223,15 +1297,28 @@ err:
@retval 0 success
@retval 1 error
+ @retval -1 fatal error
*/
+
int start_slave(THD* thd , Master_info* mi, bool net_report)
{
int slave_errno= 0;
int thread_mask;
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
DBUG_ENTER("start_slave");
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
- DBUG_RETURN(1);
+ DBUG_RETURN(-1);
+
+ create_logfile_name_with_suffix(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, 0, &mi->connection_name);
+ create_logfile_name_with_suffix(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, 0,
+ &mi->connection_name);
+
lock_slave_threads(mi); // this allows us to cleanly read slave_running
// Get a mask of _stopped_ threads
init_thread_mask(&thread_mask,mi,1 /* inverse */);
@@ -1245,7 +1332,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
thread_mask&= thd->lex->slave_thd_opt;
if (thread_mask) //some threads are stopped, start them
{
- if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
+ if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0,
thread_mask))
slave_errno=ER_MASTER_INFO;
else if (server_id_supplied && *mi->host)
@@ -1319,10 +1406,11 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
if (!slave_errno)
slave_errno = start_slave_threads(0 /*no mutex */,
- 1 /* wait for start */,
- mi,
- master_info_file,relay_log_info_file,
- thread_mask);
+ 1 /* wait for start */,
+ mi,
+ master_info_file_tmp,
+ relay_log_info_file_tmp,
+ thread_mask);
}
else
slave_errno = ER_BAD_SLAVE;
@@ -1339,11 +1427,11 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
if (slave_errno)
{
if (net_report)
- my_message(slave_errno, ER(slave_errno), MYF(0));
- DBUG_RETURN(1);
+ my_error(slave_errno, MYF(0),
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(slave_errno == ER_BAD_SLAVE ? -1 : 1);
}
- else if (net_report)
- my_ok(thd);
DBUG_RETURN(0);
}
@@ -1361,17 +1449,17 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
@retval 0 success
@retval 1 error
+ @retval -1 error
*/
+
int stop_slave(THD* thd, Master_info* mi, bool net_report )
{
- DBUG_ENTER("stop_slave");
-
int slave_errno;
- if (!thd)
- thd = current_thd;
+ DBUG_ENTER("stop_slave");
+ DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str));
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
- DBUG_RETURN(1);
+ DBUG_RETURN(-1);
THD_STAGE_INFO(thd, stage_killing_slave);
int thread_mask;
lock_slave_threads(mi);
@@ -1406,8 +1494,6 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
my_message(slave_errno, ER(slave_errno), MYF(0));
DBUG_RETURN(1);
}
- else if (net_report)
- my_ok(thd);
DBUG_RETURN(0);
}
@@ -1431,15 +1517,18 @@ int reset_slave(THD *thd, Master_info* mi)
int thread_mask= 0, error= 0;
uint sql_errno=ER_UNKNOWN_ERROR;
const char* errmsg= "Unknown error occured while reseting slave";
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
DBUG_ENTER("reset_slave");
lock_slave_threads(mi);
init_thread_mask(&thread_mask,mi,0 /* not inverse */);
if (thread_mask) // We refuse if any slave thread is running
{
- sql_errno= ER_SLAVE_MUST_STOP;
- error=1;
- goto err;
+ unlock_slave_threads(mi);
+ my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(ER_SLAVE_MUST_STOP);
}
ha_reset_slave(thd);
@@ -1466,22 +1555,35 @@ int reset_slave(THD *thd, Master_info* mi)
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
end_master_info(mi);
+
// and delete these two files
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ create_logfile_name_with_suffix(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, 0, &mi->connection_name);
+ create_logfile_name_with_suffix(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, 0, &mi->connection_name);
+
+ fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32);
if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) &&
mysql_file_delete(key_file_master_info, fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ else if (global_system_variables.log_warnings > 1)
+ sql_print_information("Deleted Master_info file '%s'.", fname);
+
// delete relay_log_info_file
- fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
+ fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32);
if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) &&
mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ else if (global_system_variables.log_warnings > 1)
+ sql_print_information("Deleted Master_info file '%s'.", fname);
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
@@ -1561,20 +1663,12 @@ bool change_master(THD* thd, Master_info* mi)
char saved_host[HOSTNAME_LENGTH + 1];
uint saved_port;
char saved_log_name[FN_REFLEN];
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
my_off_t saved_log_pos;
- DBUG_ENTER("change_master");
-
- lock_slave_threads(mi);
- init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
- if (thread_mask) // We refuse if any slave thread is running
- {
- my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
- ret= TRUE;
- goto err;
- }
+ DBUG_ENTER("change_master");
- THD_STAGE_INFO(thd, stage_changing_master);
/*
We need to check if there is an empty master_host. Otherwise
change master succeeds, a master.info file is created containing
@@ -1582,17 +1676,61 @@ bool change_master(THD* thd, Master_info* mi)
is thrown stating that the server is not configured as slave.
(See BUG#28796).
*/
- if(lex_mi->host && !*lex_mi->host)
+ if (lex_mi->host && !*lex_mi->host)
{
my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
- unlock_slave_threads(mi);
DBUG_RETURN(TRUE);
}
- // TODO: see if needs re-write
- if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
+ if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name,
+ lex_mi->host,
+ lex_mi->port))
+ DBUG_RETURN(TRUE);
+
+ lock_slave_threads(mi);
+ init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
+ if (thread_mask) // We refuse if any slave thread is running
+ {
+ my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
+ mi->connection_name.str);
+ ret= TRUE;
+ goto err;
+ }
+
+ THD_STAGE_INFO(thd, stage_changing_master);
+
+ create_logfile_name_with_suffix(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, 0, &mi->connection_name);
+ create_logfile_name_with_suffix(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, 0, &mi->connection_name);
+
+ /* if new Master_info doesn't exists, add it */
+ if (!master_info_index->get_master_info(&mi->connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE))
+ {
+ if (master_info_index->add_master_info(mi, TRUE))
+ {
+ my_error(ER_MASTER_INFO, MYF(0),
+ (int) lex_mi->connection_name.length,
+ lex_mi->connection_name.str);
+ ret= TRUE;
+ goto err;
+ }
+ }
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Master: '%.*s' Master_info_file: '%s' "
+ "Relay_info_file: '%s'",
+ (int) mi->connection_name.length,
+ mi->connection_name.str,
+ master_info_file_tmp, relay_log_info_file_tmp);
+
+ if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0,
thread_mask))
{
- my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
+ my_error(ER_MASTER_INFO, MYF(0),
+ (int) lex_mi->connection_name.length,
+ lex_mi->connection_name.str);
ret= TRUE;
goto err;
}
@@ -1860,7 +1998,7 @@ int reset_master(THD* thd)
return 1;
}
- if (mysql_bin_log.reset_logs(thd))
+ if (mysql_bin_log.reset_logs(thd, 1))
return 1;
RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
return 0;
@@ -1886,6 +2024,7 @@ bool mysql_show_binlog_events(THD* thd)
File file = -1;
MYSQL_BIN_LOG *binary_log= NULL;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
+ Master_info *mi= 0;
LOG_INFO linfo;
DBUG_ENTER("mysql_show_binlog_events");
@@ -1915,10 +2054,15 @@ bool mysql_show_binlog_events(THD* thd)
}
else /* showing relay log contents */
{
- if (!active_mi)
+ mysql_mutex_lock(&LOCK_active_mi);
+ if (!(mi= master_info_index->
+ get_master_info(&thd->variables.default_master_connection,
+ MYSQL_ERROR::WARN_LEVEL_ERROR)))
+ {
+ mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(TRUE);
-
- binary_log= &(active_mi->rli.relay_log);
+ }
+ binary_log= &(mi->rli.relay_log);
}
if (binary_log->is_open())
@@ -1932,6 +2076,13 @@ bool mysql_show_binlog_events(THD* thd)
mysql_mutex_t *log_lock = binary_log->get_log_lock();
Log_event* ev;
+ if (mi)
+ {
+ /* We can unlock the mutex as we have a lock on the file */
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mi= 0;
+ }
+
unit->set_limit(thd->lex->current_select);
limit_start= unit->offset_limit_cnt;
limit_end= unit->select_limit_cnt;
@@ -2002,7 +2153,7 @@ bool mysql_show_binlog_events(THD* thd)
description_event->checksum_alg= ev->checksum_alg;
if (event_count >= limit_start &&
- ev->net_send(protocol, linfo.log_file_name, pos))
+ ev->net_send(thd, protocol, linfo.log_file_name, pos))
{
errmsg = "Net error";
delete ev;
@@ -2026,6 +2177,9 @@ bool mysql_show_binlog_events(THD* thd)
mysql_mutex_unlock(log_lock);
}
+ else if (mi)
+ mysql_mutex_unlock(&LOCK_active_mi);
+
// Check that linfo is still on the function scope.
DEBUG_SYNC(thd, "after_show_binlog_events");