From c4d69f17753f375e8cfd18c33de291cdf13504f9 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 11 Aug 2011 11:38:52 +0200 Subject: MWL#234: Support for marking binlog events to not be replicated, and for telling slaves not to replicate events with such mark --- sql/log_event.cc | 18 ++++++++++--- sql/log_event.h | 26 +++++++++++++++++-- sql/mysql_priv.h | 2 ++ sql/mysqld.cc | 10 +++++++- sql/set_var.cc | 56 ++++++++++++++++++++++++++++++++++++++++ sql/set_var.h | 19 ++++++++++++++ sql/slave.cc | 52 +++++++++++++++++++++++++++++++++++++ sql/sql_binlog.cc | 9 +++++++ sql/sql_repl.cc | 77 ++++++++++++++++++++++++++++++++----------------------- 9 files changed, 231 insertions(+), 38 deletions(-) (limited to 'sql') diff --git a/sql/log_event.cc b/sql/log_event.cc index 217adce4f66..05313dc3337 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -665,11 +665,13 @@ const char* Log_event::get_type_str() #ifndef MYSQL_CLIENT Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) - :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg) + :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg) { server_id= thd->server_id; when= thd->start_time; cache_stmt= using_trans; + flags= flags_arg | + (thd->options & OPTION_DO_NOT_REPLICATE ? LOG_EVENT_DO_NOT_REPLICATE_F : 0); } @@ -825,7 +827,9 @@ Log_event::do_shall_skip(Relay_log_info *rli) rli->replicate_same_server_id, rli->slave_skip_counter)); if ((server_id == ::server_id && !rli->replicate_same_server_id) || - (rli->slave_skip_counter == 1 && rli->is_in_group())) + (rli->slave_skip_counter == 1 && rli->is_in_group()) || + (flags & LOG_EVENT_DO_NOT_REPLICATE_F + && opt_replicate_ignore_do_not_replicate)) return EVENT_SKIP_IGNORE; if (rli->slave_skip_counter > 0) return EVENT_SKIP_COUNT; @@ -3483,6 +3487,14 @@ Query_log_event::do_shall_skip(Relay_log_info *rli) DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len)); DBUG_ASSERT(query && q_len > 0); + /* + An event skipped due to @@do_not_replicate must not be counted towards the + number of events to be skipped due to @@sql_slave_skip_counter. + */ + if (flags & LOG_EVENT_DO_NOT_REPLICATE_F && + opt_replicate_ignore_do_not_replicate) + DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE); + if (rli->slave_skip_counter > 0) { if (strcmp("BEGIN", query) == 0) @@ -9780,7 +9792,7 @@ st_print_event_info::st_print_event_info() auto_increment_increment(0),auto_increment_offset(0), charset_inited(0), lc_time_names_number(~0), charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER), - thread_id(0), thread_id_printed(false), + thread_id(0), thread_id_printed(false), do_not_replicate(0), base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE) { /* diff --git a/sql/log_event.h b/sql/log_event.h index 4ea511f45b5..b3a6d0c9d48 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -490,6 +490,19 @@ struct sql_ex_info */ #define LOG_EVENT_RELAY_LOG_F 0x40 +/** + @def LOG_EVENT_DO_NOT_REPLICATE_F + + Flag set by application creating the event (with @@do_not_replicate); the + slave will skip replication of such events if + --replicate-ignore-do-not-replicate is set. + + This is a MariaDB flag; we allocate it from the end of the available + values to reduce risk of conflict with new MySQL flags. +*/ +#define LOG_EVENT_DO_NOT_REPLICATE_F 0x8000 + + /** @def OPTIONS_WRITTEN_TO_BIN_LOG @@ -656,6 +669,11 @@ typedef struct st_print_event_info uint charset_database_number; uint thread_id; bool thread_id_printed; + /* + Track when @@do_not_replicate changes so we need to output a SET + statement for it. + */ + int do_not_replicate; st_print_event_info(); @@ -910,8 +928,8 @@ public: /** Some 16 flags. See the definitions above for LOG_EVENT_TIME_F, - LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F, and - LOG_EVENT_SUPPRESS_USE_F for notes. + LOG_EVENT_FORCED_ROTATE_F, LOG_EVENT_THREAD_SPECIFIC_F, + LOG_EVENT_SUPPRESS_USE_F, and LOG_EVENT_DO_NOT_REPLICATE_F for notes. */ uint16 flags; @@ -3915,6 +3933,8 @@ public: DBUG_PRINT("enter", ("m_incident: %d", m_incident)); m_message.str= NULL; /* Just as a precaution */ m_message.length= 0; + /* Replicate the incident irregardless of @@do_not_replicate. */ + flags&= ~LOG_EVENT_DO_NOT_REPLICATE_F; DBUG_VOID_RETURN; } @@ -3924,6 +3944,8 @@ public: DBUG_ENTER("Incident_log_event::Incident_log_event"); DBUG_PRINT("enter", ("m_incident: %d", m_incident)); m_message= msg; + /* Replicate the incident irregardless of @@do_not_replicate. */ + flags&= ~LOG_EVENT_DO_NOT_REPLICATE_F; DBUG_VOID_RETURN; } #endif diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 38a29686906..1322ea1b165 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -504,6 +504,7 @@ protected: */ #define TMP_TABLE_FORCE_MYISAM (ULL(1) << 32) #define OPTION_PROFILING (ULL(1) << 33) +#define OPTION_DO_NOT_REPLICATE (ULL(1) << 34) // THD, user @@ -2064,6 +2065,7 @@ extern my_bool opt_old_style_user_limits, trust_function_creators; extern uint opt_crash_binlog_innodb; extern char *shared_memory_base_name, *mysqld_unix_port; extern my_bool opt_enable_shared_memory; +extern my_bool opt_replicate_ignore_do_not_replicate; extern char *default_tz_name; #endif /* MYSQL_SERVER */ #if defined MYSQL_SERVER || defined INNODB_COMPATIBILITY_HOOKS diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 3c47fe446ab..e966b35a556 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -553,6 +553,8 @@ uint opt_large_page_size= 0; uint opt_debug_sync_timeout= 0; #endif /* defined(ENABLED_DEBUG_SYNC) */ my_bool opt_old_style_user_limits= 0, trust_function_creators= 0; +my_bool opt_replicate_ignore_do_not_replicate; + /* True if there is at least one per-hour limit for some user, so we should check them before each query (and possibly reset counters when hour is @@ -6085,7 +6087,8 @@ enum options_mysqld OPT_IGNORE_BUILTIN_INNODB, OPT_BINLOG_DIRECT_NON_TRANS_UPDATE, OPT_DEFAULT_CHARACTER_SET_OLD, - OPT_MAX_LONG_DATA_SIZE + OPT_MAX_LONG_DATA_SIZE, + OPT_REPLICATE_IGNORE_DO_NOT_REPLICATE }; @@ -6782,6 +6785,11 @@ each time the SQL thread starts.", "cross database updates. If you need cross database updates to work, " "make sure you have 3.23.28 or later, and use replicate-wild-ignore-" "table=db_name.%. ", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"replicate-ignore-do-not-replicate", OPT_REPLICATE_IGNORE_DO_NOT_REPLICATE, + "Tells the slave thread not to replicate events that were created with" + "@@do_not_replicat=1.", &opt_replicate_ignore_do_not_replicate, + &opt_replicate_ignore_do_not_replicate, 0, GET_BOOL, NO_ARG, + 0, 0, 0 ,0, 0, 0}, {"replicate-ignore-table", OPT_REPLICATE_IGNORE_TABLE, "Tells the slave thread to not replicate to the specified table. To specify " "more than one table to ignore, use the directive multiple times, once for " diff --git a/sql/set_var.cc b/sql/set_var.cc index 4aa30b5634d..dc52ed7baf3 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -117,6 +117,7 @@ static bool set_option_log_bin_bit(THD *thd, set_var *var); static bool set_option_autocommit(THD *thd, set_var *var); static int check_log_update(THD *thd, set_var *var); static bool set_log_update(THD *thd, set_var *var); +static int check_do_not_replicate(THD *thd, set_var *var); static int check_pseudo_thread_id(THD *thd, set_var *var); void fix_binlog_format_after_update(THD *thd, enum_var_type type); static void fix_low_priority_updates(THD *thd, enum_var_type type); @@ -830,6 +831,10 @@ static sys_var_thd_bit sys_profiling(&vars, "profiling", NULL, static sys_var_thd_ulong sys_profiling_history_size(&vars, "profiling_history_size", &SV::profiling_history_size); #endif +static sys_var_thd_bit sys_do_not_replicate(&vars, "do_not_replicate", + check_do_not_replicate, + set_option_bit, + OPTION_DO_NOT_REPLICATE); /* Local state variables */ @@ -906,6 +911,12 @@ static sys_var_thd_set sys_log_slow_verbosity(&vars, "log_slow_verbosity", &SV::log_slow_verbosity, &log_slow_verbosity_typelib); +#ifdef HAVE_REPLICATION +static sys_var_replicate_ignore_do_not_replicate + sys_replicate_ignore_do_not_replicate(&vars, + "replicate_ignore_do_not_replicate", + &opt_replicate_ignore_do_not_replicate); +#endif /* Global read-only variable containing hostname */ static sys_var_const_str sys_hostname(&vars, "hostname", glob_hostname); @@ -3268,6 +3279,25 @@ static bool set_log_update(THD *thd, set_var *var) } +static int check_do_not_replicate(THD *thd, set_var *var) +{ + /* + We must not change @@do_not_replicate in the middle of a transaction or + statement, as that could result in only part of the transaction / statement + being replicated. + (This would be particularly serious if we were to replicate eg. + Rows_log_event without Table_map_log_event or transactional updates without + the COMMIT). + */ + if (thd->locked_tables || thd->active_transaction()) + { + my_error(ER_LOCK_OR_ACTIVE_TRANSACTION, MYF(0)); + return 1; + } + return 0; +} + + static int check_pseudo_thread_id(THD *thd, set_var *var) { var->save_result.ulonglong_value= var->value->val_int(); @@ -4412,6 +4442,32 @@ sys_var_event_scheduler::update(THD *thd, set_var *var) } +#ifdef HAVE_REPLICATION +bool sys_var_replicate_ignore_do_not_replicate::update(THD *thd, set_var *var) +{ + bool result; + int thread_mask; + DBUG_ENTER("sys_var_replicate_ignore_do_not_replicate::update"); + + /* Slave threads must be stopped to change the variable. */ + pthread_mutex_lock(&LOCK_active_mi); + lock_slave_threads(active_mi); + init_thread_mask(&thread_mask, active_mi, 0 /*not inverse*/); + if (thread_mask) // We refuse if any slave thread is running + { + my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0)); + result= TRUE; + } + else + result= sys_var_bool_ptr::update(thd, var); + + unlock_slave_threads(active_mi); + pthread_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(result); +} +#endif + + uchar *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type, LEX_STRING *base) { diff --git a/sql/set_var.h b/sql/set_var.h index 95885357b83..e2c6f4b95d2 100644 --- a/sql/set_var.h +++ b/sql/set_var.h @@ -1283,6 +1283,25 @@ public: bool is_readonly() const; }; + +#ifdef HAVE_REPLICATION +/** + Handler for setting the system variable --replicate-ignore-do-not-replicate. +*/ + +class sys_var_replicate_ignore_do_not_replicate :public sys_var_bool_ptr +{ +public: + sys_var_replicate_ignore_do_not_replicate(sys_var_chain *chain, + const char *name_arg, + my_bool *value_arg) : + sys_var_bool_ptr(chain, name_arg, value_arg) {}; + ~sys_var_replicate_ignore_do_not_replicate() {}; + bool update(THD *thd, set_var *var); +}; +#endif + + /**************************************************************************** Classes for parsing of the SET command ****************************************************************************/ diff --git a/sql/slave.cc b/sql/slave.cc index 435f3d8b95f..bffbfacc40b 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1176,6 +1176,38 @@ when it try to get the value of TIME_ZONE global variable from master."; } } + /* + Request the master to filter away events with the @@do_not_replicate flag + set, if we are running with --replicate-ignore-do_not_replicate=1. + */ + if (opt_replicate_ignore_do_not_replicate) + { + if (!mysql_real_query(mysql, STRING_WITH_LEN("SET do_not_replicate=1"))) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting master-side filtering of @@do_not_replicate failed " + "with error: %s", mysql_error(mysql)); + goto network_err; + } + else if (err_code == ER_UNKNOWN_SYSTEM_VARIABLE) + { + /* + The master is older than the slave and does not support the + @@do_not_replicate feature. + This is not a problem, as such master will not generate events with + the @@do_not_replicate flag set in the first place. We will still + do slave-side filtering of such events though, to handle the (rare) + case of downgrading a master and receiving old events generated from + before the downgrade with the @@do_not_replicate flag set. + */ + DBUG_PRINT("info", ("Old master does not support master-side filtering " + "of @@do_not_replicate events.")); + } + } + } err: if (errmsg) { @@ -2114,6 +2146,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) thd->lex->current_select= 0; if (!ev->when) ev->when= my_time(0); + thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) | + (ev->flags & LOG_EVENT_DO_NOT_REPLICATE_F ? OPTION_DO_NOT_REPLICATE : 0); ev->thd = thd; // because up to this point, ev->thd == 0 int reason= ev->shall_skip(rli); @@ -3582,6 +3616,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) { int error= 0; ulong inc_pos; + ulong event_pos; Relay_log_info *rli= &mi->rli; pthread_mutex_t *log_lock= rli->relay_log.get_log_lock(); DBUG_ENTER("queue_event"); @@ -3666,6 +3701,23 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) break; } + /* + If we filter events master-side (eg. @@do_not_replicate), we will see holes + in the event positions from the master. If we see such a hole, adjust + mi->master_log_pos accordingly so we maintain the correct position (for + reconnect, MASTER_POS_WAIT(), etc.) + */ + if (inc_pos > 0 && + event_len >= LOG_POS_OFFSET+4 && + (event_pos= uint4korr(buf+LOG_POS_OFFSET)) > mi->master_log_pos + inc_pos) + { + inc_pos= event_pos - mi->master_log_pos; + DBUG_PRINT("info", ("Adjust master_log_pos %lu->%lu to account for " + "master-side filtering", + (unsigned long)(mi->master_log_pos + inc_pos), + event_pos)); + } + /* If this event is originating from this server, don't queue it. We don't check this for 3.23 events because it's simpler like this; 3.23 diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 9713ec1ef5c..bb95054a371 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -33,6 +33,7 @@ void mysql_client_binlog_statement(THD* thd) { + ulonglong save_do_not_replicate; DBUG_ENTER("mysql_client_binlog_statement"); DBUG_PRINT("info",("binlog base64: '%*s'", (int) (thd->lex->comment.length < 2048 ? @@ -213,7 +214,15 @@ void mysql_client_binlog_statement(THD* thd) reporting. */ #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + save_do_not_replicate= thd->options & OPTION_DO_NOT_REPLICATE; + thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) | + (ev->flags & LOG_EVENT_DO_NOT_REPLICATE_F ? + OPTION_DO_NOT_REPLICATE : 0); + err= ev->apply_event(rli); + + thd->options= (thd->options & ~OPTION_DO_NOT_REPLICATE) | + save_do_not_replicate; #else err= 0; #endif diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 5038d02abca..8de2ce5fcf1 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -337,6 +337,41 @@ Increase max_allowed_packet on master"; } +/* + Helper function for mysql_binlog_send() to write an event down the slave + connection. + + Returns NULL on success, error message string on error. +*/ +static const char * +send_event_to_slave(THD *thd, NET *net, String* const packet) +{ + thd_proc_info(thd, "Sending binlog event to slave"); + + /* + Skip events with the @@do_not_replicate flag set, if slave requested + skipping of such events. + */ + if (thd->options & OPTION_DO_NOT_REPLICATE) + { + uint16 flags= uint2korr(&((*packet)[FLAGS_OFFSET+1])); + if (flags & LOG_EVENT_DO_NOT_REPLICATE_F) + return NULL; + } + + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + return "Failed on my_net_write()"; + + DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); + if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + { + if (send_file(thd)) + return "failed in send_file()"; + } + + return NULL; /* Success */ +} + /* TODO: Clean up loop to only have one call to send_file() */ @@ -349,9 +384,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, char search_file_name[FN_REFLEN], *name; IO_CACHE log; File file = -1; - String* packet = &thd->packet; + String* const packet = &thd->packet; int error; - const char *errmsg = "Unknown error"; + const char *errmsg = "Unknown error", *tmp_msg; NET* net = &thd->net; pthread_mutex_t *log_lock; bool binlog_can_be_corrupted= FALSE; @@ -588,9 +623,9 @@ impossible position"; else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) binlog_can_be_corrupted= FALSE; - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if ((tmp_msg= send_event_to_slave(thd, net, packet))) { - errmsg = "Failed on my_net_write()"; + errmsg = tmp_msg; my_errno= ER_UNKNOWN_ERROR; goto err; } @@ -603,17 +638,6 @@ impossible position"; } }); - DBUG_PRINT("info", ("log event code %d", - (*packet)[LOG_EVENT_OFFSET+1] )); - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) - { - if (send_file(thd)) - { - errmsg = "failed in send_file()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - } packet->set("\0", 1, &my_charset_bin); } @@ -713,23 +737,12 @@ impossible position"; if (read_packet) { - thd_proc_info(thd, "Sending binlog event to slave"); - if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) - { - errmsg = "Failed on my_net_write()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) - { - if (send_file(thd)) - { - errmsg = "failed in send_file()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - } + if ((tmp_msg= send_event_to_slave(thd, net, packet))) + { + errmsg = tmp_msg; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } packet->set("\0", 1, &my_charset_bin); /* No need to net_flush because we will get to flush later when -- cgit v1.2.1