diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-09-22 08:26:45 +0200 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-10-14 23:15:58 +0200 |
commit | 19abe79fd15ab6d8ac0c2f65476bc3c7d29a008d (patch) | |
tree | f5a34e32e5bceba084584efb471c2ff7b369eed4 /sql | |
parent | 50f19ca8099994e992e1b411c7c05287855a7bdd (diff) | |
download | mariadb-git-19abe79fd15ab6d8ac0c2f65476bc3c7d29a008d.tar.gz |
MDEV-7145: Delayed replication, intermediate commit.
Initial merge of delayed replication from MySQL git.
The code from the initial push into MySQL is merged, and the
associated test case passes. A number of tasks are still pending:
1. Check full test suite run for any regressions or .result file updates.
2. Extend the feature to also work for parallel replication.
3. There are some todo-comments about future refactoring left from
MySQL, these should be located and merged on top.
4. There are some later related MySQL commits, these should be checked
and merged. These include:
e134b9362ba0b750d6ac1b444780019622d14aa5
b38f0f7857c073edfcc0a64675b7f7ede04be00f
fd2b210383358fe7697f201e19ac9779879ba72a
afc397376ec50e96b2918ee64e48baf4dda0d37d
5. The testcase from MySQL relies heavily on sleep and timing for
testing, and seems likely to sporadically fail on heavily loaded test
servers in buildbot or distro build farms.
Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
Diffstat (limited to 'sql')
-rw-r--r-- | sql/lex.h | 1 | ||||
-rw-r--r-- | sql/log.cc | 8 | ||||
-rw-r--r-- | sql/log_event.cc | 7 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 2 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 178 | ||||
-rw-r--r-- | sql/rpl_rli.h | 115 | ||||
-rw-r--r-- | sql/slave.cc | 245 | ||||
-rw-r--r-- | sql/slave.h | 18 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 148 | ||||
-rw-r--r-- | sql/sql_lex.cc | 1 | ||||
-rw-r--r-- | sql/sql_lex.h | 5 | ||||
-rw-r--r-- | sql/sql_repl.cc | 4 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 13 |
13 files changed, 534 insertions, 211 deletions
diff --git a/sql/lex.h b/sql/lex.h index 85bd20a5f37..430e966eb40 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -339,6 +339,7 @@ static SYMBOL symbols[] = { { "LOW_PRIORITY", SYM(LOW_PRIORITY)}, { "MASTER", SYM(MASTER_SYM)}, { "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)}, + { "MASTER_DELAY", SYM(MASTER_DELAY_SYM)}, { "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)}, { "MASTER_HOST", SYM(MASTER_HOST_SYM)}, { "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)}, diff --git a/sql/log.cc b/sql/log.cc index be24bcd718a..e3d42a4547e 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -4292,6 +4292,10 @@ void MYSQL_BIN_LOG::wait_for_last_checkpoint_event() relay log. IMPLEMENTATION + + - You must hold rli->data_lock before calling this function, since + it writes group_relay_log_pos and similar fields of + Relay_log_info. - Protects index file with LOCK_index - Delete relevant relay log files - Copy all file names after these ones to the front of the index file @@ -4305,7 +4309,7 @@ void MYSQL_BIN_LOG::wait_for_last_checkpoint_event() read by the SQL slave thread are deleted). @note - - This is only called from the slave-execute thread when it has read + - This is only called from the slave SQL thread when it has read all commands from a relay log and want to switch to a new relay log. - When this happens, we can be in an active transaction as a transaction can span over two relay logs @@ -4336,6 +4340,8 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT); DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name)); + mysql_mutex_assert_owner(&rli->data_lock); + mysql_mutex_lock(&LOCK_index); ir= rli->inuse_relaylog_list; diff --git a/sql/log_event.cc b/sql/log_event.cc index afa58afc21d..bd8ae984c2d 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -966,6 +966,7 @@ int Log_event::do_update_pos(rpl_group_info *rgi) Relay_log_info *rli= rgi->rli; DBUG_ENTER("Log_event::do_update_pos"); + DBUG_ASSERT(!rli->belongs_to_client()); /* rli is null when (as far as I (Guilhem) know) the caller is Load_log_event::do_apply_event *and* that one is called from @@ -6400,6 +6401,9 @@ bool Rotate_log_event::write() in a A -> B -> A setup. The NOTES below is a wrong comment which will disappear when 4.1 is merged. + This must only be called from the Slave SQL thread, since it calls + flush_relay_log_info(). + @retval 0 ok */ @@ -8222,6 +8226,9 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) were we must do this cleaning is in Start_log_event_v3::do_apply_event(), not here. Because if we come here, the master was sane. + + This must only be called from the Slave SQL thread, since it calls + flush_relay_log_info(). */ int Stop_log_event::do_update_pos(rpl_group_info *rgi) diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 6048d26998b..867ebc7167a 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -18,7 +18,7 @@ #include "sql_priv.h" #include <my_dir.h> #include "rpl_mi.h" -#include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD +#include "slave.h" #include "strfunc.h" #include "sql_repl.h" diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 31dd24a038d..a7d8f843688 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -28,6 +28,7 @@ #include "rpl_utility.h" #include "transaction.h" #include "sql_parse.h" // end_trans, ROLLBACK +#include "slave.h" #include <mysql/plugin.h> #include <mysql/service_thd_wait.h> @@ -41,30 +42,24 @@ rpl_slave_state *rpl_global_gtid_slave_state; /* Object used for MASTER_GTID_WAIT(). */ gtid_waiting rpl_global_gtid_waiting; - -// Defined in slave.cc -int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); -int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, - const char *default_val); +const char *const Relay_log_info::state_delaying_string = "Waiting until MASTER_DELAY seconds after master executed event"; Relay_log_info::Relay_log_info(bool is_slave_recovery) :Slave_reporting_capability("SQL"), - no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id), + replicate_same_server_id(::replicate_same_server_id), info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), sync_counter(0), is_relay_log_recovery(is_slave_recovery), save_temporary_tables(0), mi(0), inuse_relaylog_list(0), last_inuse_relaylog(0), cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0), -#if HAVE_valgrind - is_fake(FALSE), -#endif group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), + sql_delay(0), sql_delay_end(0), m_flags(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); @@ -115,39 +110,55 @@ Relay_log_info::~Relay_log_info() } +/** + Wrapper around Relay_log_info::init(const char *). + + @todo Remove this and replace all calls to it by calls to + Relay_log_info::init(const char *). /SVEN +*/ int init_relay_log_info(Relay_log_info* rli, const char* info_fname) { + return rli->init(info_fname); +} + + +/** + Read the relay_log.info file. + + @param info_fname The name of the file to read from. + @retval 0 success + @retval 1 failure +*/ +int Relay_log_info::init(const char* info_fname) +{ char fname[FN_REFLEN+128]; - int info_fd; const char* msg = 0; int error = 0; DBUG_ENTER("init_relay_log_info"); - DBUG_ASSERT(!rli->no_storage); // Don't init if there is no storage - if (rli->inited) // Set if this function called + if (inited) // Set if this function called DBUG_RETURN(0); fn_format(fname, info_fname, mysql_data_home, "", 4+32); - mysql_mutex_lock(&rli->data_lock); - info_fd = rli->info_fd; - rli->cur_log_fd = -1; - rli->slave_skip_counter=0; - rli->abort_pos_wait=0; - rli->log_space_limit= relay_log_space_limit; - rli->log_space_total= 0; + mysql_mutex_lock(&data_lock); + cur_log_fd = -1; + slave_skip_counter=0; + abort_pos_wait=0; + log_space_limit= relay_log_space_limit; + log_space_total= 0; char pattern[FN_REFLEN]; (void) my_realpath(pattern, slave_load_tmpdir, 0); if (fn_format(pattern, PREFIX_SQL_LOAD, pattern, "", MY_SAFE_PATH | MY_RETURN_REAL_PATH) == NullS) { - mysql_mutex_unlock(&rli->data_lock); + mysql_mutex_unlock(&data_lock); sql_print_error("Unable to use slave's temporary directory %s", slave_load_tmpdir); DBUG_RETURN(1); } - unpack_filename(rli->slave_patternload_file, pattern); - rli->slave_patternload_file_size= strlen(rli->slave_patternload_file); + unpack_filename(slave_patternload_file, pattern); + slave_patternload_file_size= strlen(slave_patternload_file); /* The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE. @@ -161,7 +172,7 @@ int init_relay_log_info(Relay_log_info* rli, if (opt_relay_logname && opt_relay_logname[strlen(opt_relay_logname) - 1] == FN_LIBCHAR) { - mysql_mutex_unlock(&rli->data_lock); + mysql_mutex_unlock(&data_lock); sql_print_error("Path '%s' is a directory name, please specify \ a file name for --relay-log option", opt_relay_logname); DBUG_RETURN(1); @@ -173,7 +184,7 @@ a file name for --relay-log option", opt_relay_logname); opt_relaylog_index_name[strlen(opt_relaylog_index_name) - 1] == FN_LIBCHAR) { - mysql_mutex_unlock(&rli->data_lock); + mysql_mutex_unlock(&data_lock); sql_print_error("Path '%s' is a directory name, please specify \ a file name for --relay-log-index option", opt_relaylog_index_name); DBUG_RETURN(1); @@ -182,7 +193,7 @@ a file name for --relay-log-index option", opt_relaylog_index_name); char buf[FN_REFLEN]; const char *ln; static bool name_warning_sent= 0; - ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin", + ln= relay_log.generate_name(opt_relay_logname, "-relay-bin", 1, buf); /* We send the warning only at startup, not after every RESET SLAVE */ if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent && @@ -205,7 +216,6 @@ a file name for --relay-log-index option", opt_relaylog_index_name); } /* For multimaster, add connection name to relay log filenames */ - Master_info* mi= rli->mi; char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN]; char *buf_relaylog_index_name= opt_relaylog_index_name; @@ -227,11 +237,11 @@ a file name for --relay-log-index option", opt_relaylog_index_name); note, that if open() fails, we'll still have index file open but a destructor will take care of that */ - if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) || - rli->relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND, - mi->rli.max_relay_log_size, 1, TRUE)) + if (relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) || + relay_log.open(ln, LOG_BIN, 0, 0, SEQ_READ_APPEND, + max_relay_log_size, 1, TRUE)) { - mysql_mutex_unlock(&rli->data_lock); + mysql_mutex_unlock(&data_lock); sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno); DBUG_RETURN(1); } @@ -254,7 +264,7 @@ file '%s', errno %d)", fname, my_errno); msg= current_thd->get_stmt_da()->message(); goto err; } - if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, + if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, MYF(MY_WME))) { sql_print_error("Failed to create a cache on relay log info file '%s'", @@ -264,20 +274,19 @@ file '%s', errno %d)", fname, my_errno); } /* Init relay log with first entry in the relay index file */ - if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, + if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, &msg, 0)) { sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)"); goto err; } - rli->group_master_log_name[0]= 0; - rli->group_master_log_pos= 0; - rli->info_fd= info_fd; + group_master_log_name[0]= 0; + group_master_log_pos= 0; } else // file exists { if (info_fd >= 0) - reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0); + reinit_io_cache(&info_file, READ_CACHE, 0L,0,0); else { int error=0; @@ -289,7 +298,7 @@ Failed to open the existing relay log info file '%s' (errno %d)", fname, my_errno); error= 1; } - else if (init_io_cache(&rli->info_file, info_fd, + else if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME))) { sql_print_error("Failed to create a cache on relay log info file '%s'", @@ -300,24 +309,15 @@ Failed to open the existing relay log info file '%s' (errno %d)", { if (info_fd >= 0) mysql_file_close(info_fd, MYF(0)); - rli->info_fd= -1; - rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); - mysql_mutex_unlock(&rli->data_lock); + info_fd= -1; + relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); + mysql_mutex_unlock(&data_lock); DBUG_RETURN(1); } } - rli->info_fd = info_fd; int relay_log_pos, master_log_pos, lines; char *first_non_digit; - /* - In MySQL 5.6, there is a MASTER_DELAY option to CHANGE MASTER. This is - not yet merged into MariaDB (as of 10.0.13). However, we detect the - presense of the new option in relay-log.info, as a placeholder for - possible later merge of the feature, and to maintain file format - compatibility with MySQL 5.6+. - */ - int dummy_sql_delay; /* Starting from MySQL 5.6.x, relay-log.info has a new format. @@ -342,25 +342,25 @@ Failed to open the existing relay log info file '%s' (errno %d)", it is line count and not binlog name (new format) it will be overwritten by the second row later. */ - if (init_strvar_from_file(rli->group_relay_log_name, - sizeof(rli->group_relay_log_name), - &rli->info_file, "")) + if (init_strvar_from_file(group_relay_log_name, + sizeof(group_relay_log_name), + &info_file, "")) { msg="Error reading slave log configuration"; goto err; } - lines= strtoul(rli->group_relay_log_name, &first_non_digit, 10); + lines= strtoul(group_relay_log_name, &first_non_digit, 10); - if (rli->group_relay_log_name[0] != '\0' && + if (group_relay_log_name[0] != '\0' && *first_non_digit == '\0' && lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) { DBUG_PRINT("info", ("relay_log_info file is in new format.")); /* Seems to be new format => read relay log name from next line */ - if (init_strvar_from_file(rli->group_relay_log_name, - sizeof(rli->group_relay_log_name), - &rli->info_file, "")) + if (init_strvar_from_file(group_relay_log_name, + sizeof(group_relay_log_name), + &info_file, "")) { msg="Error reading slave log configuration"; goto err; @@ -370,70 +370,70 @@ Failed to open the existing relay log info file '%s' (errno %d)", DBUG_PRINT("info", ("relay_log_info file is in old format.")); if (init_intvar_from_file(&relay_log_pos, - &rli->info_file, BIN_LOG_HEADER_SIZE) || - init_strvar_from_file(rli->group_master_log_name, - sizeof(rli->group_master_log_name), - &rli->info_file, "") || - init_intvar_from_file(&master_log_pos, &rli->info_file, 0) || + &info_file, BIN_LOG_HEADER_SIZE) || + init_strvar_from_file(group_master_log_name, + sizeof(group_master_log_name), + &info_file, "") || + init_intvar_from_file(&master_log_pos, &info_file, 0) || (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY && - init_intvar_from_file(&dummy_sql_delay, &rli->info_file, 0))) + init_intvar_from_file(&sql_delay, &info_file, 0))) { msg="Error reading slave log configuration"; goto err; } - strmake_buf(rli->event_relay_log_name,rli->group_relay_log_name); - rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos; - rli->group_master_log_pos= master_log_pos; + strmake_buf(event_relay_log_name,group_relay_log_name); + group_relay_log_pos= event_relay_log_pos= relay_log_pos; + group_master_log_pos= master_log_pos; - if (rli->is_relay_log_recovery && init_recovery(rli->mi, &msg)) + if (is_relay_log_recovery && init_recovery(mi, &msg)) goto err; - rli->relay_log_state.load(rpl_global_gtid_slave_state); - if (init_relay_log_pos(rli, - rli->group_relay_log_name, - rli->group_relay_log_pos, + relay_log_state.load(rpl_global_gtid_slave_state); + if (init_relay_log_pos(this, + group_relay_log_name, + group_relay_log_pos, 0 /* no data lock*/, &msg, 0)) { sql_print_error("Failed to open the relay log '%s' (relay_log_pos %llu)", - rli->group_relay_log_name, rli->group_relay_log_pos); + group_relay_log_name, group_relay_log_pos); goto err; } } - DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu rli->event_relay_log_pos=%llu", - my_b_tell(rli->cur_log), rli->event_relay_log_pos)); - DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); + DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu event_relay_log_pos=%llu", + my_b_tell(cur_log), event_relay_log_pos)); + DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE); + DBUG_ASSERT(my_b_tell(cur_log) == event_relay_log_pos); /* Now change the cache from READ to WRITE - must do this before flush_relay_log_info */ - reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); - if ((error= flush_relay_log_info(rli))) + reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1); + if ((error= flush_relay_log_info(this))) { msg= "Failed to flush relay log info file"; goto err; } - if (count_relay_log_space(rli)) + if (count_relay_log_space(this)) { msg="Error counting relay log space"; goto err; } - rli->inited= 1; - mysql_mutex_unlock(&rli->data_lock); + inited= 1; + mysql_mutex_unlock(&data_lock); DBUG_RETURN(error); err: sql_print_error("%s", msg); - end_io_cache(&rli->info_file); + end_io_cache(&info_file); if (info_fd >= 0) mysql_file_close(info_fd, MYF(0)); - rli->info_fd= -1; - rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); - mysql_mutex_unlock(&rli->data_lock); + info_fd= -1; + relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); + mysql_mutex_unlock(&data_lock); DBUG_RETURN(1); } @@ -750,6 +750,8 @@ err: if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg) *errmsg= "Invalid Format_description log event; could be out of memory"; + DBUG_PRINT("info", ("Returning %d from init_relay_log_pos", (*errmsg)?1:0)); + DBUG_RETURN ((*errmsg) ? 1 : 0); } @@ -967,8 +969,11 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, { DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos"); - if (!skip_lock) + if (skip_lock) + mysql_mutex_assert_owner(&data_lock); + else mysql_mutex_lock(&data_lock); + rgi->inc_event_relay_log_pos(); DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", (long) log_pos, (long) group_master_log_pos)); @@ -1294,6 +1299,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, { DBUG_ENTER("Relay_log_info::stmt_done"); + DBUG_ASSERT(!belongs_to_client()); DBUG_ASSERT(rgi->rli == this); /* If in a transaction, and if the slave supports transactions, just diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 96c3e7c3fac..0c8f16a0d16 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -29,11 +29,6 @@ class Master_info; class Rpl_filter; -enum { - LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5 -}; - - /**************************************************************************** Replication SQL Thread @@ -78,11 +73,17 @@ public: }; /* - If flag set, then rli does not store its state in any info file. - This is the case only when we execute BINLOG SQL commands inside - a client, non-replication thread. + The SQL thread owns one Relay_log_info, and each client that has + executed a BINLOG statement owns one Relay_log_info. This function + returns zero for the Relay_log_info object that belongs to the SQL + thread and nonzero for Relay_log_info objects that belong to + clients. */ - bool no_storage; + inline bool belongs_to_client() + { + DBUG_ASSERT(sql_driver_thd); + return !sql_driver_thd->slave_thread; + } /* If true, events with the same server id should be replicated. This @@ -194,6 +195,11 @@ public: relay log and finishing (commiting) on another relay log. Case which can happen when, for example, the relay log gets rotated because of max_binlog_size. + + Note: group_relay_log_name, group_relay_log_pos must only be + written from the thread owning the Relay_log_info (SQL thread if + !belongs_to_client(); client thread executing BINLOG statement if + belongs_to_client()). */ char group_relay_log_name[FN_REFLEN]; ulonglong group_relay_log_pos; @@ -205,16 +211,17 @@ public: */ char future_event_master_log_name[FN_REFLEN]; -#ifdef HAVE_valgrind - bool is_fake; /* Mark that this is a fake relay log info structure */ -#endif - /* Original log name and position of the group we're currently executing (whose coordinates are group_relay_log_name/pos in the relay log) in the master's binlog. These concern the *group*, because in the master's binlog the log_pos that comes with each event is the position of the beginning of the group. + + Note: group_master_log_name, group_master_log_pos must only be + written from the thread owning the Relay_log_info (SQL thread if + !belongs_to_client(); client thread executing BINLOG statement if + belongs_to_client()). */ char group_master_log_name[FN_REFLEN]; volatile my_off_t group_master_log_pos; @@ -244,6 +251,15 @@ public: bool sql_thread_caught_up; void clear_until_condition(); + /** + Reset the delay. + This is used by RESET SLAVE to clear the delay. + */ + void clear_sql_delay() + { + sql_delay= 0; + } + /* Needed for problems when slave stops and we want to restart it @@ -474,8 +490,72 @@ public: m_flags&= ~flag; } + /** + Text used in THD::proc_info when the slave SQL thread is delaying. + */ + static const char *const state_delaying_string; + + bool flush(); + + /** + Reads the relay_log.info file. + */ + int init(const char* info_filename); + + /** + Indicate that a delay starts. + + This does not actually sleep; it only sets the state of this + Relay_log_info object to delaying so that the correct state can be + reported by SHOW SLAVE STATUS and SHOW PROCESSLIST. + + Requires rli->data_lock. + + @param delay_end The time when the delay shall end. + */ + void start_sql_delay(time_t delay_end) + { + mysql_mutex_assert_owner(&data_lock); + sql_delay_end= delay_end; + thd_proc_info(sql_driver_thd, state_delaying_string); + } + + int32 get_sql_delay() { return sql_delay; } + void set_sql_delay(time_t _sql_delay) { sql_delay= _sql_delay; } + time_t get_sql_delay_end() { return sql_delay_end; } + private: + + /** + Delay slave SQL thread by this amount, compared to master (in + seconds). This is set with CHANGE MASTER TO MASTER_DELAY=X. + + Guarded by data_lock. Initialized by the client thread executing + START SLAVE. Written by client threads executing CHANGE MASTER TO + MASTER_DELAY=X. Read by SQL thread and by client threads + executing SHOW SLAVE STATUS. Note: must not be written while the + slave SQL thread is running, since the SQL thread reads it without + a lock when executing flush_relay_log_info(). + */ + int sql_delay; + + /** + During a delay, specifies the point in time when the delay ends. + + This is used for the SQL_Remaining_Delay column in SHOW SLAVE STATUS. + + Guarded by data_lock. Written by the sql thread. Read by client + threads executing SHOW SLAVE STATUS. + */ + time_t sql_delay_end; + + /* + Before the MASTER_DELAY parameter was added (WL#344), + relay_log.info had 4 lines. Now it has 5 lines. + */ + static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5; + /* Holds the state of the data in the relay log. We need this to ensure that we are not in the middle of a @@ -874,7 +954,14 @@ public: }; -// Defined in rpl_rli.cc +/** + Reads the relay_log.info file. + + @todo This is a wrapper around Relay_log_info::init(). It's only + kept for historical reasons. It would be good if we removed this + function and replaced all calls to it by calls to + Relay_log_info::init(). /SVEN +*/ int init_relay_log_info(Relay_log_info* rli, const char* info_fname); diff --git a/sql/slave.cc b/sql/slave.cc index a55d26b1aa0..d1d9abd4027 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1634,8 +1634,10 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) (master_res= mysql_store_result(mysql)) && (master_row= mysql_fetch_row(master_res))) { + mysql_mutex_lock(&mi->data_lock); mi->clock_diff_with_master= (long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10)); + mysql_mutex_unlock(&mi->data_lock); } else if (check_io_slave_killed(mi, NULL)) goto slave_killed_err; @@ -1647,7 +1649,9 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) } else { + mysql_mutex_lock(&mi->data_lock); mi->clock_diff_with_master= 0; /* The "most sensible" value */ + mysql_mutex_unlock(&mi->data_lock); sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, " "do not trust column Seconds_Behind_Master of SHOW " "SLAVE STATUS. Error: %s (%d)", @@ -2797,6 +2801,15 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list, Item_empty_string(thd, "Parallel_Mode", sizeof("conservative")-1), mem_root); + field_list->push_back(new (mem_root) + Item_return_int(thd, "SQL_Delay", 10, + MYSQL_TYPE_LONG)); + field_list->push_back(new (mem_root) + Item_return_int(thd, "SQL_Remaining_Delay", 8, + MYSQL_TYPE_LONG)); + field_list->push_back(new (mem_root) + Item_empty_string(thd, "Slave_SQL_Running_State", + 20)); if (full) { field_list->push_back(new (mem_root) @@ -2986,6 +2999,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, prot_store_ids(thd, &mi->ignore_server_ids); // Master_Server_id protocol->store((uint32) mi->master_id); + // SQL_Delay // Master_Ssl_Crl protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath @@ -3008,6 +3022,22 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store(mode_name, strlen(mode_name), &my_charset_bin); } + protocol->store((uint32) mi->rli.get_sql_delay()); + // SQL_Remaining_Delay + // THD::proc_info is not protected by any lock, so we read it once + // to ensure that we use the same value throughout this function. + const char *slave_sql_running_state= + mi->rli.sql_driver_thd ? mi->rli.sql_driver_thd->proc_info : ""; + if (slave_sql_running_state == Relay_log_info::state_delaying_string) + { + time_t t= my_time(0), sql_delay_end= mi->rli.get_sql_delay_end(); + protocol->store((uint32)(t < sql_delay_end ? sql_delay_end - t : 0)); + } + else + protocol->store_null(); + // Slave_SQL_Running_State + protocol->store(slave_sql_running_state, &my_charset_bin); + if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3377,6 +3407,69 @@ has_temporary_error(THD *thd) } +/** + If this is a lagging slave (specified with CHANGE MASTER TO MASTER_DELAY = X), delays accordingly. Also unlocks rli->data_lock. + + Design note: this is the place to unlock rli->data_lock here since + it should be held when reading delay info from rli, but it should + not be held while sleeping. + + @param ev Event that is about to be executed. + + @param thd The sql thread's THD object. + + @param rli The sql thread's Relay_log_info structure. +*/ +static void sql_delay_event(Log_event *ev, THD *thd, rpl_group_info *rgi) +{ + Relay_log_info* rli= rgi->rli; + long sql_delay= rli->get_sql_delay(); + + DBUG_ENTER("sql_delay_event"); + mysql_mutex_assert_owner(&rli->data_lock); + DBUG_ASSERT(!rli->belongs_to_client()); + + int type= ev->get_type_code(); + if (sql_delay && type != ROTATE_EVENT && + type != FORMAT_DESCRIPTION_EVENT && type != START_EVENT_V3) + { + // The time when we should execute the event. + time_t sql_delay_end= + ev->when + rli->mi->clock_diff_with_master + sql_delay; + // The current time. + time_t now= my_time(0); + // The time we will have to sleep before executing the event. + unsigned long nap_time= 0; + if (sql_delay_end > now) + nap_time= sql_delay_end - now; + + DBUG_PRINT("info", ("sql_delay= %lu " + "ev->when= %lu " + "rli->mi->clock_diff_with_master= %lu " + "now= %ld " + "sql_delay_end= %lu " + "nap_time= %ld", + sql_delay, (long)ev->when, + rli->mi->clock_diff_with_master, + (long)now, sql_delay_end, (long)nap_time)); + + if (sql_delay_end > now) + { + DBUG_PRINT("info", ("delaying replication event %lu secs", + nap_time)); + rli->start_sql_delay(sql_delay_end); + mysql_mutex_unlock(&rli->data_lock); + slave_sleep(thd, nap_time, sql_slave_killed, rgi); + DBUG_VOID_RETURN; + } + } + + mysql_mutex_unlock(&rli->data_lock); + + DBUG_VOID_RETURN; +} + + /* First half of apply_event_and_update_pos(), see below. Setup some THD variables for applying the event. @@ -3500,16 +3593,16 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi, if (exec_res == 0) { int error= ev->update_pos(rgi); -#ifdef HAVE_valgrind - if (!rli->is_fake) -#endif + #ifndef DBUG_OFF + DBUG_PRINT("info", ("update_pos error = %d", error)); + if (!rli->belongs_to_client()) { - DBUG_PRINT("info", ("update_pos error = %d", error)); DBUG_PRINT("info", ("group %llu %s", rli->group_relay_log_pos, rli->group_relay_log_name)); DBUG_PRINT("info", ("event %llu %s", rli->event_relay_log_pos, rli->event_relay_log_name)); } +#endif /* The update should not fail, so print an error message and return an error code. @@ -3544,21 +3637,39 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi, /** Applies the given event and advances the relay log position. - In essence, this function does: + This is needed by the sql thread to execute events from the binlog, + and by clients executing BINLOG statements. Conceptually, this + function does: @code ev->apply_event(rli); ev->update_pos(rli); @endcode - But it also does some maintainance, such as skipping events if - needed and reporting errors. + It also does the following maintainance: - If the @c skip flag is set, then it is tested whether the event - should be skipped, by looking at the slave_skip_counter and the - server id. The skip flag should be set when calling this from a - replication thread but not set when executing an explicit BINLOG - statement. + - Initializes the thread's server_id and time; and the event's + thread. + + - If !rli->belongs_to_client() (i.e., if it belongs to the slave + sql thread instead of being used for executing BINLOG + statements), it does the following things: (1) skips events if it + is needed according to the server id or slave_skip_counter; (2) + unlocks rli->data_lock; (3) sleeps if required by 'CHANGE MASTER + TO MASTER_DELAY=X'; (4) maintains the running state of the sql + thread (rli->thread_state). + + - Reports errors as needed. + + @param ev The event to apply. + + @param thd The client thread that executes the event (i.e., the + slave sql thread if called from a replication slave, or the client + thread if called to execute a BINLOG statement). + + @param rli The relay log info (i.e., the slave's rli if called from + a replication slave, or the client's thd->rli_fake if called to + execute a BINLOG statement). @retval 0 OK. @@ -3581,7 +3692,15 @@ apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi) DBUG_ASSERT(rli->slave_skip_counter > 0); rli->slave_skip_counter--; } - mysql_mutex_unlock(&rli->data_lock); + + if (reason == Log_event::EVENT_SKIP_NOT) + { + // Sleeps if needed, and unlocks rli->data_lock. + sql_delay_event(ev, thd, rgi); + } + else + mysql_mutex_unlock(&rli->data_lock); + return apply_event_and_update_pos_apply(ev, thd, rgi, reason); } @@ -3603,6 +3722,10 @@ apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd, driver thread, so 23 should never see EVENT_SKIP_COUNT here. */ DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT); + /* + Calling sql_delay_event() was handled in the SQL driver thread when + doing parallel replication. + */ return apply_event_and_update_pos_apply(ev, thd, rgi, reason); } @@ -3682,7 +3805,8 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) /** - Top-level function for executing the next event from the relay log. + Top-level function for executing the next event in the relay log. + This is called from the SQL thread. This function reads the event from the relay log, executes it, and advances the relay log position. It also handles errors, etc. @@ -4229,8 +4353,10 @@ connected: };); #endif - // TODO: the assignment below should be under mutex (5.0) + mysql_mutex_lock(&mi->run_lock); mi->slave_running= MYSQL_SLAVE_RUN_CONNECT; + mysql_mutex_unlock(&mi->run_lock); + thd->slave_net = &mysql->net; THD_STAGE_INFO(thd, stage_checking_master_version); ret= get_master_version_and_clock(mysql, mi); @@ -6582,67 +6708,80 @@ MYSQL *rpl_connect_master(MYSQL *mysql) } #endif -/* - Store the file and position where the execute-slave thread are in the +/** + Store the file and position where the slave's SQL thread are in the relay log. - SYNOPSIS - flush_relay_log_info() - rli Relay log information + Notes: - NOTES - - As this is only called by the slave thread or on STOP SLAVE, with the - log_lock grabbed and the slave thread stopped, we don't need to have - a lock here. - - If there is an active transaction, then we don't update the position - in the relay log. This is to ensure that we re-execute statements - if we die in the middle of an transaction that was rolled back. - - As a transaction never spans binary logs, we don't have to handle the - case where we do a relay-log-rotation in the middle of the transaction. - If this would not be the case, we would have to ensure that we - don't delete the relay log file where the transaction started when - we switch to a new relay log file. - - TODO - - Change the log file information to a binary format to avoid calling - longlong2str. + - This function should be called either from the slave SQL thread, + or when the slave thread is not running. (It reads the + group_{relay|master}_log_{pos|name} and delay fields in the rli + object. These may only be modified by the slave SQL thread or by + a client thread when the slave SQL thread is not running.) - RETURN VALUES - 0 ok - 1 write error -*/ + - If there is an active transaction, then we do not update the + position in the relay log. This is to ensure that we re-execute + statements if we die in the middle of an transaction that was + rolled back. + + - As a transaction never spans binary logs, we don't have to handle + the case where we do a relay-log-rotation in the middle of the + transaction. If transactions could span several binlogs, we would + have to ensure that we do not delete the relay log file where the + transaction started before switching to a new relay log file. + + - Error can happen if writing to file fails or if flushing the file + fails. + + @param rli The object representing the Relay_log_info. + @todo Change the log file information to a binary format to avoid + calling longlong2str. + + @todo Move the member function into rpl_rli.cc and get rid of the + global function. /SVEN + + @return 0 on success, 1 on error. +*/ bool flush_relay_log_info(Relay_log_info* rli) { - bool error=0; - DBUG_ENTER("flush_relay_log_info"); + return rli->flush(); +} - if (unlikely(rli->no_storage)) - DBUG_RETURN(0); +bool Relay_log_info::flush() +{ + bool error=0; - IO_CACHE *file = &rli->info_file; - char buff[FN_REFLEN*2+22*2+4], *pos; + DBUG_ENTER("Relay_log_info::flush()"); + IO_CACHE *file = &info_file; + // 2*file name, 2*long long, 2*unsigned long, 6*'\n' + char buff[FN_REFLEN * 2 + 22 * 2 + 10 * 2 + 6], *pos; my_b_seek(file, 0L); - pos=strmov(buff, rli->group_relay_log_name); + pos= longlong10_to_str(LINES_IN_RELAY_LOG_INFO_WITH_DELAY, buff, 10); + *pos++='\n'; + pos=strmov(pos, group_relay_log_name); *pos++='\n'; - pos= longlong10_to_str(rli->group_relay_log_pos, pos, 10); + pos=longlong10_to_str(group_relay_log_pos, pos, 10); *pos++='\n'; - pos=strmov(pos, rli->group_master_log_name); + pos=strmov(pos, group_master_log_name); *pos++='\n'; - pos=longlong10_to_str(rli->group_master_log_pos, pos, 10); + pos=longlong10_to_str(group_master_log_pos, pos, 10); *pos='\n'; + pos= longlong10_to_str(sql_delay, pos, 10); + *pos= '\n'; if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1)) error=1; if (flush_io_cache(file)) error=1; if (sync_relayloginfo_period && !error && - ++(rli->sync_counter) >= sync_relayloginfo_period) + ++sync_counter >= sync_relayloginfo_period) { - if (my_sync(rli->info_fd, MYF(MY_WME))) + if (my_sync(info_fd, MYF(MY_WME))) error=1; - rli->sync_counter= 0; + sync_counter= 0; } /* Flushing the relay log is done by the slave I/O thread diff --git a/sql/slave.h b/sql/slave.h index a78ae4cff6f..d28048af8ea 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -18,6 +18,14 @@ #define SLAVE_H /** + MASTER_DELAY can be at most (1 << 31) - 1. +*/ +#define MASTER_DELAY_MAX (0x7FFFFFFF) +#if INT_MAX < 0x7FFFFFFF +#error "don't support platforms where INT_MAX < 0x7FFFFFFF" +#endif + +/** @defgroup Replication Replication @{ @@ -102,12 +110,14 @@ int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f); In Master_info: run_lock, data_lock run_lock protects all information about the run state: slave_running, thd - and the existence of the I/O thread to stop/start it, you need this mutex). + and the existence of the I/O thread (to stop/start it, you need this mutex). data_lock protects some moving members of the struct: counters (log name, position) and relay log (MYSQL_BIN_LOG object). In Relay_log_info: run_lock, data_lock see Master_info + However, note that run_lock does not protect + Relay_log_info.run_state; that is protected by data_lock. Order of acquisition: if you want to have LOCK_active_mi and a run_lock, you must acquire LOCK_active_mi first. @@ -247,6 +257,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, int apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd, struct rpl_group_info *rgi); +int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); +int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val); +int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, + const char *default_val); +int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f); + pthread_handler_t handle_slave_io(void *arg); void slave_output_error_info(rpl_group_info *rgi, THD *thd); pthread_handler_t handle_slave_sql(void *arg); diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index f0465cdf5bf..a7beb42c315 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -17,18 +17,96 @@ #include <my_global.h> #include "sql_priv.h" #include "sql_binlog.h" -#include "sql_parse.h" // check_global_access -#include "sql_acl.h" // *_ACL +#include "sql_parse.h" +#include "sql_acl.h" #include "rpl_rli.h" #include "base64.h" -#include "slave.h" // apply_event_and_update_pos -#include "log_event.h" // Format_description_log_event, - // EVENT_LEN_OFFSET, - // EVENT_TYPE_OFFSET, - // FORMAT_DESCRIPTION_LOG_EVENT, - // START_EVENT_V3, - // Log_event_type, - // Log_event +#include "slave.h" +#include "log_event.h" + + +/** + Check if the event type is allowed in a BINLOG statement. + + @retval 0 if the event type is ok. + @retval 1 if the event type is not ok. +*/ +static int check_event_type(int type, Relay_log_info *rli) +{ + Format_description_log_event *fd_event= + rli->relay_log.description_event_for_exec; + + /* + Convert event type id of certain old versions (see comment in + Format_description_log_event::Format_description_log_event(char*,...)). + */ + if (fd_event && fd_event->event_type_permutation) + { + IF_DBUG({ + int new_type= fd_event->event_type_permutation[type]; + DBUG_PRINT("info", + ("converting event type %d to %d (%s)", + type, new_type, + Log_event::get_type_str((Log_event_type)new_type))); + }, + (void)0); + type= fd_event->event_type_permutation[type]; + } + + switch (type) + { + case START_EVENT_V3: + case FORMAT_DESCRIPTION_EVENT: + /* + We need a preliminary FD event in order to parse the FD event, + if we don't already have one. + */ + if (!fd_event) + if (!(rli->relay_log.description_event_for_exec= + new Format_description_log_event(4))) + { + my_error(ER_OUTOFMEMORY, MYF(0), 1); + return 1; + } + + /* It is always allowed to execute FD events. */ + return 0; + + case TABLE_MAP_EVENT: + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + case PRE_GA_WRITE_ROWS_EVENT: + case PRE_GA_UPDATE_ROWS_EVENT: + case PRE_GA_DELETE_ROWS_EVENT: + /* + Row events are only allowed if a Format_description_event has + already been seen. + */ + if (fd_event) + return 0; + else + { + my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT, + MYF(0), Log_event::get_type_str((Log_event_type)type)); + return 1; + } + break; + + default: + /* + It is not meaningful to execute other events than row-events and + FD events. It would even be dangerous to execute Stop_log_event + and Rotate_log_event since they call flush_relay_log_info, which + is not allowed to call by other threads than the slave SQL + thread when the slave SQL thread is running. + */ + my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT, + MYF(0), Log_event::get_type_str((Log_event_type)type)); + return 1; + } +} + /** Execute a BINLOG statement. @@ -73,31 +151,13 @@ void mysql_client_binlog_statement(THD* thd) Allocation */ - /* - If we do not have a Format_description_event, we create a dummy - one here. In this case, the first event we read must be a - Format_description_event. - */ - my_bool have_fd_event= TRUE; int err; Relay_log_info *rli; rpl_group_info *rgi; rli= thd->rli_fake; - if (!rli) - { - rli= thd->rli_fake= new Relay_log_info(FALSE); -#ifdef HAVE_valgrind - rli->is_fake= TRUE; -#endif - have_fd_event= FALSE; - } - if (rli && !rli->relay_log.description_event_for_exec) - { - rli->relay_log.description_event_for_exec= - new Format_description_log_event(4); - have_fd_event= FALSE; - } + if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE))) + rli->sql_driver_thd= thd; if (!(rgi= thd->rgi_fake)) rgi= thd->rgi_fake= new rpl_group_info(rli); rgi->thd= thd; @@ -109,16 +169,13 @@ void mysql_client_binlog_statement(THD* thd) /* Out of memory check */ - if (!(rli && - rli->relay_log.description_event_for_exec && - buf)) + if (!(rli && buf)) { my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); /* needed 1 bytes */ goto end; } - rli->sql_driver_thd= thd; - rli->no_storage= TRUE; + DBUG_ASSERT(rli->belongs_to_client()); for (char const *strptr= thd->lex->comment.str ; strptr < thd->lex->comment.str + thd->lex->comment.length ; ) @@ -185,23 +242,8 @@ void mysql_client_binlog_statement(THD* thd) DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d", event_len, bytes_decoded)); - /* - If we have not seen any Format_description_event, then we must - see one; it is the only statement that can be read in base64 - without a prior Format_description_event. - */ - if (!have_fd_event) - { - int type = (uchar)bufptr[EVENT_TYPE_OFFSET]; - if (type == FORMAT_DESCRIPTION_EVENT || type == START_EVENT_V3) - have_fd_event= TRUE; - else - { - my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT, - MYF(0), Log_event::get_type_str((Log_event_type)type)); - goto end; - } - } + if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli)) + goto end; ev= Log_event::read_log_event(bufptr, event_len, &error, rli->relay_log.description_event_for_exec, @@ -212,7 +254,7 @@ void mysql_client_binlog_statement(THD* thd) { /* This could actually be an out-of-memory, but it is more likely - causes by a bad statement + caused by a bad statement */ my_error(ER_SYNTAX_ERROR, MYF(0)); goto end; diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index b62838f46f2..15cf9a39f90 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -4831,4 +4831,3 @@ void binlog_unsafe_map_init() BINLOG_DIRECT_OFF & TRX_CACHE_NOT_EMPTY); } #endif - diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 6ec112cc038..626beaa9894 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -240,11 +240,12 @@ struct LEX_MASTER_INFO ulong server_id; uint port, connect_retry; float heartbeat_period; + int sql_delay; /* Enum is used for making it possible to detect if the user changed variable or if it should be left at old value */ - enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE} + enum {LEX_MI_UNCHANGED= 0, LEX_MI_DISABLE, LEX_MI_ENABLE} ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt, repl_do_domain_ids_opt, repl_ignore_domain_ids_opt; enum { @@ -260,6 +261,7 @@ struct LEX_MASTER_INFO sizeof(ulong), 0, 16, MYF(0)); my_init_dynamic_array(&repl_ignore_domain_ids, sizeof(ulong), 0, 16, MYF(0)); + sql_delay= -1; } void reset(bool is_change_master) { @@ -280,6 +282,7 @@ struct LEX_MASTER_INFO repl_ignore_domain_ids_opt= LEX_MI_UNCHANGED; gtid_pos_str= null_lex_str; use_gtid_opt= LEX_GTID_UNCHANGED; + sql_delay= -1; } }; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 572d08399d5..8bea280b820 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -3319,6 +3319,7 @@ int reset_slave(THD *thd, Master_info* mi) mi->clear_error(); mi->rli.clear_error(); mi->rli.clear_until_condition(); + mi->rli.clear_sql_delay(); mi->rli.slave_skip_counter= 0; // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 @@ -3630,6 +3631,9 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED) mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE); + if (lex_mi->sql_delay != -1) + mi->rli.set_sql_delay(lex_mi->sql_delay); + if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED) mi->ssl_verify_server_cert= (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE); diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 9131b99d95e..3b9a06d5b38 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -1341,6 +1341,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token LOOP_SYM %token LOW_PRIORITY %token MASTER_CONNECT_RETRY_SYM +%token MASTER_DELAY_SYM %token MASTER_GTID_POS_SYM %token MASTER_HOST_SYM %token MASTER_LOG_FILE_SYM @@ -2255,6 +2256,16 @@ master_def: { Lex->mi.connect_retry = $3; } + | MASTER_DELAY_SYM '=' ulong_num + { + if ($3 > MASTER_DELAY_MAX) + { + my_error(ER_MASTER_DELAY_VALUE_OUT_OF_RANGE, MYF(0), + $3, MASTER_DELAY_MAX); + } + else + Lex->mi.sql_delay = $3; + } | MASTER_SSL_SYM '=' ulong_num { Lex->mi.ssl= $3 ? @@ -7660,6 +7671,7 @@ slave: LEX *lex=Lex; lex->sql_command = SQLCOM_SLAVE_ALL_START; lex->type = 0; + /* If you change this code don't forget to update STOP SLAVE too */ } {} | STOP_SYM SLAVE optional_connection_name slave_thread_opts @@ -14099,6 +14111,7 @@ keyword_sp: | MASTER_PASSWORD_SYM {} | MASTER_SERVER_ID_SYM {} | MASTER_CONNECT_RETRY_SYM {} + | MASTER_DELAY_SYM {} | MASTER_SSL_SYM {} | MASTER_SSL_CA_SYM {} | MASTER_SSL_CAPATH_SYM {} |