summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2011-08-11 11:38:52 +0200
committerunknown <knielsen@knielsen-hq.org>2011-08-11 11:38:52 +0200
commitc4d69f17753f375e8cfd18c33de291cdf13504f9 (patch)
tree4122550e2bd9b38209eb2b9f86c0421f68e70571 /sql
parent51c7723eb2faf7bb9de8d0a2a3b364b07f82b0fd (diff)
downloadmariadb-git-c4d69f17753f375e8cfd18c33de291cdf13504f9.tar.gz
MWL#234: Support for marking binlog events to not be replicated, and for telling slaves not to replicate events with such mark
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.cc18
-rw-r--r--sql/log_event.h26
-rw-r--r--sql/mysql_priv.h2
-rw-r--r--sql/mysqld.cc10
-rw-r--r--sql/set_var.cc56
-rw-r--r--sql/set_var.h19
-rw-r--r--sql/slave.cc52
-rw-r--r--sql/sql_binlog.cc9
-rw-r--r--sql/sql_repl.cc77
9 files changed, 231 insertions, 38 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 217adce4f66..05313dc3337 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -665,11 +665,13 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
- :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
+ :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg)
{
server_id= thd->server_id;
when= thd->start_time;
cache_stmt= using_trans;
+ flags= flags_arg |
+ (thd->options & OPTION_DO_NOT_REPLICATE ? LOG_EVENT_DO_NOT_REPLICATE_F : 0);
}
@@ -825,7 +827,9 @@ Log_event::do_shall_skip(Relay_log_info *rli)
rli->replicate_same_server_id,
rli->slave_skip_counter));
if ((server_id == ::server_id && !rli->replicate_same_server_id) ||
- (rli->slave_skip_counter == 1 && rli->is_in_group()))
+ (rli->slave_skip_counter == 1 && rli->is_in_group()) ||
+ (flags & LOG_EVENT_DO_NOT_REPLICATE_F
+ && opt_replicate_ignore_do_not_replicate))
return EVENT_SKIP_IGNORE;
if (rli->slave_skip_counter > 0)
return EVENT_SKIP_COUNT;
@@ -3483,6 +3487,14 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len));
DBUG_ASSERT(query && q_len > 0);
+ /*
+ An event skipped due to @@do_not_replicate must not be counted towards the
+ number of events to be skipped due to @@sql_slave_skip_counter.
+ */
+ if (flags & LOG_EVENT_DO_NOT_REPLICATE_F &&
+ opt_replicate_ignore_do_not_replicate)
+ DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
+
if (rli->slave_skip_counter > 0)
{
if (strcmp("BEGIN", query) == 0)
@@ -9780,7 +9792,7 @@ st_print_event_info::st_print_event_info()
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
lc_time_names_number(~0),
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
- thread_id(0), thread_id_printed(false),
+ thread_id(0), thread_id_printed(false), do_not_replicate(0),
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
{
/*
diff --git a/sql/log_event.h b/sql/log_event.h
index 4ea511f45b5..b3a6d0c9d48 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -491,6 +491,19 @@ struct sql_ex_info
#define LOG_EVENT_RELAY_LOG_F 0x40
/**
+ @def LOG_EVENT_DO_NOT_REPLICATE_F
+
+ Flag set by application creating the event (with @@do_not_replicate); the
+ slave will skip replication of such events if
+ --replicate-ignore-do-not-replicate is set.
+
+ This is a MariaDB flag; we allocate it from the end of the available
+ values to reduce risk of conflict with new MySQL flags.
+*/
+#define LOG_EVENT_DO_NOT_REPLICATE_F 0x8000
+
+
+/**
@def OPTIONS_WRITTEN_TO_BIN_LOG
OPTIONS_WRITTEN_TO_BIN_LOG are the bits of thd->options which must
@@ -656,6 +669,11 @@ typedef struct st_print_event_info
uint charset_database_number;
uint thread_id;
bool thread_id_printed;
+ /*
+ Track when @@do_not_replicate changes so we need to output a SET
+ statement for it.
+ */
+ int do_not_replicate;
st_print_event_info();
@@ -910,8 +928,8 @@ public:
/**
Some 16 flags. See the definitions above for LOG_EVENT_TIME_F,
- LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F, and
- LOG_EVENT_SUPPRESS_USE_F for notes.
+ LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F,
+ LOG_EVENT_SUPPRESS_USE_F, and LOG_EVENT_DO_NOT_REPLICATE_F for notes.
*/
uint16 flags;
@@ -3915,6 +3933,8 @@ public:
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
m_message.str= NULL; /* Just as a precaution */
m_message.length= 0;
+ /* Replicate the incident irregardless of @@do_not_replicate. */
+ flags&= ~LOG_EVENT_DO_NOT_REPLICATE_F;
DBUG_VOID_RETURN;
}
@@ -3924,6 +3944,8 @@ public:
DBUG_ENTER("Incident_log_event::Incident_log_event");
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
m_message= msg;
+ /* Replicate the incident irregardless of @@do_not_replicate. */
+ flags&= ~LOG_EVENT_DO_NOT_REPLICATE_F;
DBUG_VOID_RETURN;
}
#endif
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 38a29686906..1322ea1b165 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -504,6 +504,7 @@ protected:
*/
#define TMP_TABLE_FORCE_MYISAM (ULL(1) << 32)
#define OPTION_PROFILING (ULL(1) << 33)
+#define OPTION_DO_NOT_REPLICATE (ULL(1) << 34) // THD, user
@@ -2064,6 +2065,7 @@ extern my_bool opt_old_style_user_limits, trust_function_creators;
extern uint opt_crash_binlog_innodb;
extern char *shared_memory_base_name, *mysqld_unix_port;
extern my_bool opt_enable_shared_memory;
+extern my_bool opt_replicate_ignore_do_not_replicate;
extern char *default_tz_name;
#endif /* MYSQL_SERVER */
#if defined MYSQL_SERVER || defined INNODB_COMPATIBILITY_HOOKS
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 3c47fe446ab..e966b35a556 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -553,6 +553,8 @@ uint opt_large_page_size= 0;
uint opt_debug_sync_timeout= 0;
#endif /* defined(ENABLED_DEBUG_SYNC) */
my_bool opt_old_style_user_limits= 0, trust_function_creators= 0;
+my_bool opt_replicate_ignore_do_not_replicate;
+
/*
True if there is at least one per-hour limit for some user, so we should
check them before each query (and possibly reset counters when hour is
@@ -6085,7 +6087,8 @@ enum options_mysqld
OPT_IGNORE_BUILTIN_INNODB,
OPT_BINLOG_DIRECT_NON_TRANS_UPDATE,
OPT_DEFAULT_CHARACTER_SET_OLD,
- OPT_MAX_LONG_DATA_SIZE
+ OPT_MAX_LONG_DATA_SIZE,
+ OPT_REPLICATE_IGNORE_DO_NOT_REPLICATE
};
@@ -6782,6 +6785,11 @@ each time the SQL thread starts.",
"cross database updates. If you need cross database updates to work, "
"make sure you have 3.23.28 or later, and use replicate-wild-ignore-"
"table=db_name.%. ", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"replicate-ignore-do-not-replicate", OPT_REPLICATE_IGNORE_DO_NOT_REPLICATE,
+ "Tells the slave thread not to replicate events that were created with"
+ "@@do_not_replicat=1.", &opt_replicate_ignore_do_not_replicate,
+ &opt_replicate_ignore_do_not_replicate, 0, GET_BOOL, NO_ARG,
+ 0, 0, 0 ,0, 0, 0},
{"replicate-ignore-table", OPT_REPLICATE_IGNORE_TABLE,
"Tells the slave thread to not replicate to the specified table. To specify "
"more than one table to ignore, use the directive multiple times, once for "
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 4aa30b5634d..dc52ed7baf3 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -117,6 +117,7 @@ static bool set_option_log_bin_bit(THD *thd, set_var *var);
static bool set_option_autocommit(THD *thd, set_var *var);
static int check_log_update(THD *thd, set_var *var);
static bool set_log_update(THD *thd, set_var *var);
+static int check_do_not_replicate(THD *thd, set_var *var);
static int check_pseudo_thread_id(THD *thd, set_var *var);
void fix_binlog_format_after_update(THD *thd, enum_var_type type);
static void fix_low_priority_updates(THD *thd, enum_var_type type);
@@ -830,6 +831,10 @@ static sys_var_thd_bit sys_profiling(&vars, "profiling", NULL,
static sys_var_thd_ulong sys_profiling_history_size(&vars, "profiling_history_size",
&SV::profiling_history_size);
#endif
+static sys_var_thd_bit sys_do_not_replicate(&vars, "do_not_replicate",
+ check_do_not_replicate,
+ set_option_bit,
+ OPTION_DO_NOT_REPLICATE);
/* Local state variables */
@@ -906,6 +911,12 @@ static sys_var_thd_set sys_log_slow_verbosity(&vars,
"log_slow_verbosity",
&SV::log_slow_verbosity,
&log_slow_verbosity_typelib);
+#ifdef HAVE_REPLICATION
+static sys_var_replicate_ignore_do_not_replicate
+ sys_replicate_ignore_do_not_replicate(&vars,
+ "replicate_ignore_do_not_replicate",
+ &opt_replicate_ignore_do_not_replicate);
+#endif
/* Global read-only variable containing hostname */
static sys_var_const_str sys_hostname(&vars, "hostname", glob_hostname);
@@ -3268,6 +3279,25 @@ static bool set_log_update(THD *thd, set_var *var)
}
+static int check_do_not_replicate(THD *thd, set_var *var)
+{
+ /*
+ We must not change @@do_not_replicate in the middle of a transaction or
+ statement, as that could result in only part of the transaction / statement
+ being replicated.
+ (This would be particularly serious if we were to replicate eg.
+ Rows_log_event without Table_map_log_event or transactional updates without
+ the COMMIT).
+ */
+ if (thd->locked_tables || thd->active_transaction())
+ {
+ my_error(ER_LOCK_OR_ACTIVE_TRANSACTION, MYF(0));
+ return 1;
+ }
+ return 0;
+}
+
+
static int check_pseudo_thread_id(THD *thd, set_var *var)
{
var->save_result.ulonglong_value= var->value->val_int();
@@ -4412,6 +4442,32 @@ sys_var_event_scheduler::update(THD *thd, set_var *var)
}
+#ifdef HAVE_REPLICATION
+bool sys_var_replicate_ignore_do_not_replicate::update(THD *thd, set_var *var)
+{
+ bool result;
+ int thread_mask;
+ DBUG_ENTER("sys_var_replicate_ignore_do_not_replicate::update");
+
+ /* Slave threads must be stopped to change the variable. */
+ pthread_mutex_lock(&LOCK_active_mi);
+ lock_slave_threads(active_mi);
+ init_thread_mask(&thread_mask, active_mi, 0 /*not inverse*/);
+ if (thread_mask) // We refuse if any slave thread is running
+ {
+ my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
+ result= TRUE;
+ }
+ else
+ result= sys_var_bool_ptr::update(thd, var);
+
+ unlock_slave_threads(active_mi);
+ pthread_mutex_unlock(&LOCK_active_mi);
+ DBUG_RETURN(result);
+}
+#endif
+
+
uchar *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type,
LEX_STRING *base)
{
diff --git a/sql/set_var.h b/sql/set_var.h
index 95885357b83..e2c6f4b95d2 100644
--- a/sql/set_var.h
+++ b/sql/set_var.h
@@ -1283,6 +1283,25 @@ public:
bool is_readonly() const;
};
+
+#ifdef HAVE_REPLICATION
+/**
+ Handler for setting the system variable --replicate-ignore-do-not-replicate.
+*/
+
+class sys_var_replicate_ignore_do_not_replicate :public sys_var_bool_ptr
+{
+public:
+ sys_var_replicate_ignore_do_not_replicate(sys_var_chain *chain,
+ const char *name_arg,
+ my_bool *value_arg) :
+ sys_var_bool_ptr(chain, name_arg, value_arg) {};
+ ~sys_var_replicate_ignore_do_not_replicate() {};
+ bool update(THD *thd, set_var *var);
+};
+#endif
+
+
/****************************************************************************
Classes for parsing of the SET command
****************************************************************************/
diff --git a/sql/slave.cc b/sql/slave.cc
index 435f3d8b95f..bffbfacc40b 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1176,6 +1176,38 @@ when it try to get the value of TIME_ZONE global variable from master.";
}
}
+ /*
+ Request the master to filter away events with the @@do_not_replicate flag
+ set, if we are running with --replicate-ignore-do_not_replicate=1.
+ */
+ if (opt_replicate_ignore_do_not_replicate)
+ {
+ if (!mysql_real_query(mysql, STRING_WITH_LEN("SET do_not_replicate=1")))
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting master-side filtering of @@do_not_replicate failed "
+ "with error: %s", mysql_error(mysql));
+ goto network_err;
+ }
+ else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE)
+ {
+ /*
+ The master is older than the slave and does not support the
+ @@do_not_replicate feature.
+ This is not a problem, as such master will not generate events with
+ the @@do_not_replicate flag set in the first place. We will still
+ do slave-side filtering of such events though, to handle the (rare)
+ case of downgrading a master and receiving old events generated from
+ before the downgrade with the @@do_not_replicate flag set.
+ */
+ DBUG_PRINT("info", ("Old master does not support master-side filtering "
+ "of @@do_not_replicate events."));
+ }
+ }
+ }
err:
if (errmsg)
{
@@ -2114,6 +2146,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
thd->lex->current_select= 0;
if (!ev->when)
ev->when= my_time(0);
+ thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) |
+ (ev->flags & LOG_EVENT_DO_NOT_REPLICATE_F ? OPTION_DO_NOT_REPLICATE : 0);
ev->thd = thd; // because up to this point, ev->thd == 0
int reason= ev->shall_skip(rli);
@@ -3582,6 +3616,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
int error= 0;
ulong inc_pos;
+ ulong event_pos;
Relay_log_info *rli= &mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
DBUG_ENTER("queue_event");
@@ -3667,6 +3702,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
/*
+ If we filter events master-side (eg. @@do_not_replicate), we will see holes
+ in the event positions from the master. If we see such a hole, adjust
+ mi->master_log_pos accordingly so we maintain the correct position (for
+ reconnect, MASTER_POS_WAIT(), etc.)
+ */
+ if (inc_pos > 0 &&
+ event_len >= LOG_POS_OFFSET+4 &&
+ (event_pos= uint4korr(buf+LOG_POS_OFFSET)) > mi->master_log_pos + inc_pos)
+ {
+ inc_pos= event_pos - mi->master_log_pos;
+ DBUG_PRINT("info", ("Adjust master_log_pos %lu->%lu to account for "
+ "master-side filtering",
+ (unsigned long)(mi->master_log_pos + inc_pos),
+ event_pos));
+ }
+
+ /*
If this event is originating from this server, don't queue it.
We don't check this for 3.23 events because it's simpler like this; 3.23
will be filtered anyway by the SQL slave thread which also tests the
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index 9713ec1ef5c..bb95054a371 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -33,6 +33,7 @@
void mysql_client_binlog_statement(THD* thd)
{
+ ulonglong save_do_not_replicate;
DBUG_ENTER("mysql_client_binlog_statement");
DBUG_PRINT("info",("binlog base64: '%*s'",
(int) (thd->lex->comment.length < 2048 ?
@@ -213,7 +214,15 @@ void mysql_client_binlog_statement(THD* thd)
reporting.
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ save_do_not_replicate= thd->options & OPTION_DO_NOT_REPLICATE;
+ thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) |
+ (ev->flags & LOG_EVENT_DO_NOT_REPLICATE_F ?
+ OPTION_DO_NOT_REPLICATE : 0);
+
err= ev->apply_event(rli);
+
+ thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) |
+ save_do_not_replicate;
#else
err= 0;
#endif
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 5038d02abca..8de2ce5fcf1 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -338,6 +338,41 @@ Increase max_allowed_packet on master";
/*
+ Helper function for mysql_binlog_send() to write an event down the slave
+ connection.
+
+ Returns NULL on success, error message string on error.
+*/
+static const char *
+send_event_to_slave(THD *thd, NET *net, String* const packet)
+{
+ thd_proc_info(thd, "Sending binlog event to slave");
+
+ /*
+ Skip events with the @@do_not_replicate flag set, if slave requested
+ skipping of such events.
+ */
+ if (thd->options & OPTION_DO_NOT_REPLICATE)
+ {
+ uint16 flags= uint2korr(&((*packet)[FLAGS_OFFSET+1]));
+ if (flags & LOG_EVENT_DO_NOT_REPLICATE_F)
+ return NULL;
+ }
+
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ return "Failed on my_net_write()";
+
+ DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
+ if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ {
+ if (send_file(thd))
+ return "failed in send_file()";
+ }
+
+ return NULL; /* Success */
+}
+
+/*
TODO: Clean up loop to only have one call to send_file()
*/
@@ -349,9 +384,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
char search_file_name[FN_REFLEN], *name;
IO_CACHE log;
File file = -1;
- String* packet = &thd->packet;
+ String* const packet = &thd->packet;
int error;
- const char *errmsg = "Unknown error";
+ const char *errmsg = "Unknown error", *tmp_msg;
NET* net = &thd->net;
pthread_mutex_t *log_lock;
bool binlog_can_be_corrupted= FALSE;
@@ -588,9 +623,9 @@ impossible position";
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if ((tmp_msg= send_event_to_slave(thd, net, packet)))
{
- errmsg = "Failed on my_net_write()";
+ errmsg = tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
@@ -603,17 +638,6 @@ impossible position";
}
});
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
- {
- if (send_file(thd))
- {
- errmsg = "failed in send_file()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- }
packet->set("\0", 1, &my_charset_bin);
}
@@ -713,23 +737,12 @@ impossible position";
if (read_packet)
{
- thd_proc_info(thd, "Sending binlog event to slave");
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
- {
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
-
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
- {
- if (send_file(thd))
- {
- errmsg = "failed in send_file()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- }
+ if ((tmp_msg= send_event_to_slave(thd, net, packet)))
+ {
+ errmsg = tmp_msg;
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
packet->set("\0", 1, &my_charset_bin);
/*
No need to net_flush because we will get to flush later when