diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel_retry.result | 62 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel_retry.test | 91 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 163 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 6 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 39 | ||||
-rw-r--r-- | sql/rpl_rli.h | 36 | ||||
-rw-r--r-- | sql/slave.cc | 14 | ||||
-rw-r--r-- | sql/slave.h | 1 |
8 files changed, 394 insertions, 18 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_retry.result b/mysql-test/suite/rpl/r/rpl_parallel_retry.result new file mode 100644 index 00000000000..352285a91bf --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel_retry.result @@ -0,0 +1,62 @@ +include/rpl_init.inc [topology=1->2] +*** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. *** +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1,1); +SET sql_log_bin=0; +CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) +RETURNS INT DETERMINISTIC +BEGIN +RETURN x; +END +|| +SET sql_log_bin=1; +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=5; +include/start_slave.inc +SET sql_log_bin=0; +CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) +RETURNS INT DETERMINISTIC +BEGIN +IF d1 != '' THEN +SET debug_sync = d1; +END IF; +IF d2 != '' THEN +SET debug_sync = d2; +END IF; +RETURN x; +END +|| +include/stop_slave.inc +SET @old_format= @@SESSION.binlog_format; +SET binlog_format='statement'; +SET gtid_seq_no = 100; +BEGIN; +INSERT INTO t1 VALUES (2,1); +UPDATE t1 SET b=b+1 WHERE a=1; +INSERT INTO t1 VALUES (3,1); +COMMIT; +SET binlog_format=@old_format; +SELECT * FROM t1 ORDER BY a; +a b +1 2 +2 1 +3 1 +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100"; +include/start_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +retries +1 +SELECT * FROM t1 ORDER BY a; +a b +1 2 +2 1 +3 1 +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +include/start_slave.inc +DROP TABLE t1; +DROP function foo; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_retry.test b/mysql-test/suite/rpl/t/rpl_parallel_retry.test new file mode 100644 index 00000000000..edf00269737 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_retry.test @@ -0,0 +1,91 @@ +--source include/have_innodb.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +--let $rpl_topology=1->2 +--source include/rpl_init.inc + +--echo *** Test retry of transactions that fail to replicate due to deadlock or similar temporary error. *** + +--connection server_1 +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1,1); +--save_master_pos + +# Use a stored function to inject a debug_sync into the appropriate THD. +# The function does nothing on the master, and on the slave it injects the +# desired debug_sync action(s). +SET sql_log_bin=0; +--delimiter || +CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) + RETURNS INT DETERMINISTIC + BEGIN + RETURN x; + END +|| +--delimiter ; +SET sql_log_bin=1; + +--connection server_2 +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=5; +--source include/start_slave.inc +--sync_with_master +SET sql_log_bin=0; +--delimiter || +CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) + RETURNS INT DETERMINISTIC + BEGIN + IF d1 != '' THEN + SET debug_sync = d1; + END IF; + IF d2 != '' THEN + SET debug_sync = d2; + END IF; + RETURN x; + END +|| +--delimiter ; +--source include/stop_slave.inc + +--connection server_1 +SET @old_format= @@SESSION.binlog_format; +SET binlog_format='statement'; +SET gtid_seq_no = 100; +BEGIN; +INSERT INTO t1 VALUES (2,1); +UPDATE t1 SET b=b+1 WHERE a=1; +#INSERT INTO t1 VALUES (3,foo(1, +# "ha_write_row_end SIGNAL q1_ready WAIT_FOR q1_cont", +# "")); +INSERT INTO t1 VALUES (3,1); +COMMIT; +SET binlog_format=@old_format; +SELECT * FROM t1 ORDER BY a; +--save_master_pos + +--connection server_2 +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100"; +let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1); +--source include/start_slave.inc +--sync_with_master +SET GLOBAL debug_dbug=@old_dbug; +let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1); +--disable_query_log +eval SELECT $new_retry - $old_retry AS retries; +--enable_query_log + +SELECT * FROM t1 ORDER BY a; + +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +--source include/start_slave.inc + +--connection server_1 +DROP TABLE t1; +DROP function foo; + +--source include/rpl_end.inc diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 53769107661..f0147527957 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -7,15 +7,6 @@ /* Code for optional parallel execution of replicated events on the slave. - - ToDo list: - - - Retry of failed transactions is not yet implemented for the parallel case. - - - All the waits (eg. in struct wait_for_commit and in - rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill, - everything needs to be correctly rolled back and stopped in all threads, - to ensure a consistent slave replication state. */ struct rpl_parallel_thread_pool global_rpl_thread_pool; @@ -197,6 +188,105 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, } +static int +retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog) +{ + /* ToDo */ + return 0; +} + + +static int +retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, + rpl_parallel_thread::queued_event *orig_qev) +{ + IO_CACHE rlog; + File fd; + const char *errmsg= NULL; + inuse_relaylog *ir= rgi->relay_log; + uint64 event_count= 0; + uint64 events_to_execute= rgi->retry_event_count; + Relay_log_info *rli= rgi->rli; + int err= 0; + ulonglong cur_offset, old_offset; + char log_name[FN_REFLEN]; + THD *thd= rgi->thd; + +do_retry: + rgi->cleanup_context(thd, 1); + + mysql_mutex_lock(&rli->data_lock); + ++rli->retried_trans; + statistic_increment(slave_retried_transactions, LOCK_status); + mysql_mutex_unlock(&rli->data_lock); + + strcpy(log_name, ir->name); + if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0) + return 1; + cur_offset= rgi->retry_start_offset; + my_b_seek(&rlog, cur_offset); + + do + { + Log_event_type event_type; + Log_event *ev; + + old_offset= cur_offset; + ev= Log_event::read_log_event(&rlog, 0, + rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, + opt_slave_sql_verify_checksum); + cur_offset= my_b_tell(&rlog); + + if (!ev) + { + err= 1; + goto err; + } + ev->thd= thd; + event_type= ev->get_type_code(); + if (Log_event::is_group_event(event_type)) + { + rpl_parallel_thread::queued_event *qev; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset, + cur_offset - old_offset); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if (!qev) + { + delete ev; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + err= 1; + goto err; + } + err= rpt_handle_event(qev, rpt); + ++event_count; + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->free_qev(qev); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } + else + err= retry_handle_relay_log_rotate(ev, &rlog); + delete_or_keep_event_post_apply(rgi, event_type, ev); + + if (err) + { + /* ToDo: Need to here also handle second retry. */ + goto err; + } + + // ToDo: handle too many retries. + + } while (event_count < events_to_execute); + +err: + + end_io_cache(&rlog); + mysql_file_close(fd, MYF(MY_WME)); + return err; +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -499,7 +589,23 @@ handle_rpl_parallel_thread(void *arg) everything is stopped and cleaned up correctly. */ if (likely(!rgi->worker_error) && !skip_event_group) + { + ++rgi->retry_event_count; err= rpt_handle_event(events, rpt); + DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_1_100", + if (rgi->current_gtid.domain_id == 0 && + rgi->current_gtid.server_id == 1 && + rgi->current_gtid.seq_no == 100 && + rgi->retry_event_count == 4) + { + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + my_error(ER_LOCK_DEADLOCK, MYF(0)); + err= 1; + };); + if (err && has_temporary_error(thd)) + err= retry_event_group(rgi, rpt, events); + } else err= thd->wait_for_prior_commit(); @@ -802,8 +908,7 @@ err: rpl_parallel_thread::queued_event * -rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, - Relay_log_info *rli) +rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) { queued_event *qev; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -817,6 +922,17 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, qev->ev= ev; qev->event_size= event_size; qev->next= NULL; + return qev; +} + + +rpl_parallel_thread::queued_event * +rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli) +{ + queued_event *qev= get_qev_common(ev, event_size); + if (!qev) + return NULL; strcpy(qev->event_relay_log_name, rli->event_relay_log_name); qev->event_relay_log_pos= rli->event_relay_log_pos; qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; @@ -825,6 +941,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, } +rpl_parallel_thread::queued_event * +rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev, + const char *relay_log_name, + ulonglong event_pos, ulonglong event_size) +{ + queued_event *qev= get_qev_common(ev, event_size); + if (!qev) + return NULL; + qev->rgi= orig_qev->rgi; + strcpy(qev->event_relay_log_name, relay_log_name); + qev->event_relay_log_pos= event_pos; + qev->future_event_relay_log_pos= event_pos+event_size; + strcpy(qev->future_event_master_log_name, + orig_qev->future_event_master_log_name); + return qev; +} + + void rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) { @@ -836,7 +970,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) rpl_group_info* rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, - rpl_parallel_entry *e) + rpl_parallel_entry *e, ulonglong event_size) { rpl_group_info *rgi; mysql_mutex_assert_owner(&LOCK_rpl_thread); @@ -864,6 +998,9 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, return NULL; } rgi->parallel_entry= e; + rgi->relay_log= rli->last_inuse_relaylog; + rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; + rgi->retry_event_count= 0; return rgi; } @@ -1439,7 +1576,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e))) + if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size))) { cur_thread->free_qev(qev); abandon_worker_thread(rli->sql_driver_thd, cur_thread, diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 1808efd0926..3b6641523f6 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -106,11 +106,15 @@ struct rpl_parallel_thread { queued_size-= dequeue_size; } + queued_event *get_qev_common(Log_event *ev, ulonglong event_size); queued_event *get_qev(Log_event *ev, ulonglong event_size, Relay_log_info *rli); + queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev, + const char *relay_log_name, + ulonglong event_pos, ulonglong event_size); void free_qev(queued_event *qev); rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, - rpl_parallel_entry *e); + rpl_parallel_entry *e, ulonglong event_size); void free_rgi(rpl_group_info *rgi); group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev); void free_gco(group_commit_orderer *gco); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index cc543f7c377..3a3e22f970a 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), sync_counter(0), is_relay_log_recovery(is_slave_recovery), save_temporary_tables(0), mi(0), + inuse_relaylog_list(0), last_inuse_relaylog(0), cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0), #if HAVE_valgrind @@ -98,8 +99,17 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) Relay_log_info::~Relay_log_info() { + inuse_relaylog *cur; DBUG_ENTER("Relay_log_info::~Relay_log_info"); + cur= inuse_relaylog_list; + while (cur) + { + DBUG_ASSERT(cur->queued_count == cur->dequeued_count); + inuse_relaylog *next= cur->next; + my_free(cur); + cur= next; + } mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&log_space_lock); @@ -1339,6 +1349,29 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, DBUG_VOID_RETURN; } + +int +Relay_log_info::alloc_inuse_relaylog(const char *name) +{ + inuse_relaylog *ir; + + if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); + return 1; + } + strcpy(ir->name, name); + + if (!inuse_relaylog_list) + inuse_relaylog_list= ir; + else + last_inuse_relaylog->next= ir; + last_inuse_relaylog= ir; + + return 0; +} + + #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int rpl_load_gtid_slave_state(THD *thd) @@ -1623,7 +1656,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, void rpl_group_info::cleanup_context(THD *thd, bool error) { - DBUG_ENTER("Relay_log_info::cleanup_context"); + DBUG_ENTER("rpl_group_info::cleanup_context"); DBUG_PRINT("enter", ("error: %d", (int) error)); DBUG_ASSERT(this->thd == thd); @@ -1689,7 +1722,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) void rpl_group_info::clear_tables_to_lock() { - DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); + DBUG_ENTER("rpl_group_info::clear_tables_to_lock()"); #ifndef DBUG_OFF /** When replicating in RBR and MyISAM Merge tables are involved @@ -1736,7 +1769,7 @@ void rpl_group_info::clear_tables_to_lock() void rpl_group_info::slave_close_thread_tables(THD *thd) { - DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); + DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)"); thd->get_stmt_da()->set_overwrite_status(true); thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); thd->get_stmt_da()->set_overwrite_status(false); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 00d16f52488..c2cdbcdc573 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -61,6 +61,7 @@ enum { *****************************************************************************/ struct rpl_group_info; +struct inuse_relaylog; class Relay_log_info : public Slave_reporting_capability { @@ -164,6 +165,13 @@ public: Master_info *mi; /* + List of active relay log files. + (This can be more than one in case of parallel replication). + */ + inuse_relaylog *inuse_relaylog_list; + inuse_relaylog *last_inuse_relaylog; + + /* Needed to deal properly with cur_log getting closed and re-opened with a different log under our feet */ @@ -398,6 +406,7 @@ public: void stmt_done(my_off_t event_log_pos, time_t event_creation_time, THD *thd, rpl_group_info *rgi); + int alloc_inuse_relaylog(const char *name); /** Is the replication inside a group? @@ -464,6 +473,25 @@ private: /* + In parallel replication, if we need to re-try a transaction due to a + deadlock or other temporary error, we may need to go back and re-read events + out of an earlier relay log. + + This structure keeps track of the relaylogs that are potentially in use. + Each rpl_group_info has a pointer to one of those, corresponding to the + first GTID event. + + A reference count keeps track of how long a relay log is potentially in use. +*/ +struct inuse_relaylog { + inuse_relaylog *next; + uint64 queued_count; + uint64 dequeued_count; + char name[FN_REFLEN]; +}; + + +/* This is data for various state needed to be kept for the processing of one event group (transaction) during replication. @@ -596,6 +624,14 @@ struct rpl_group_info /* Needs room for "Gtid D-S-N\x00". */ char gtid_info_buf[5+10+1+10+1+20+1]; + /* + Information to be able to re-try an event group in case of a deadlock or + other temporary error. + */ + inuse_relaylog *relay_log; + uint64 retry_start_offset; + uint64 retry_event_count; + rpl_group_info(Relay_log_info *rli_); ~rpl_group_info(); void reinit(Relay_log_info *rli); diff --git a/sql/slave.cc b/sql/slave.cc index f755cb63558..ab505a4011f 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3094,7 +3094,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) that the error is temporary by pushing a warning with the error code ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary. */ -static int has_temporary_error(THD *thd) +int +has_temporary_error(THD *thd) { DBUG_ENTER("has_temporary_error"); @@ -4478,6 +4479,9 @@ pthread_handler_t handle_slave_sql(void *arg) "Error initializing relay log position: %s", errmsg); goto err; } + if (rli->alloc_inuse_relaylog(rli->group_relay_log_name)) + goto err; + strcpy(rli->future_event_master_log_name, rli->group_master_log_name); THD_CHECK_SENTRY(thd); #ifndef DBUG_OFF @@ -6521,6 +6525,12 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) mysql_mutex_unlock(log_lock); goto err; } + if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name)) + { + if (!hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } if (!hot_log) mysql_mutex_unlock(log_lock); continue; @@ -6536,6 +6546,8 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, &errmsg)) <0) goto err; + if (rli->alloc_inuse_relaylog(rli->linfo.log_file_name)) + goto err; } else { diff --git a/sql/slave.h b/sql/slave.h index 7352ac0274b..4b5bc1686fb 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -229,6 +229,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, void set_slave_thread_options(THD* thd); void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi); int rotate_relay_log(Master_info* mi); +int has_temporary_error(THD *thd); int apply_event_and_update_pos(Log_event* ev, THD* thd, struct rpl_group_info *rgi, rpl_parallel_thread *rpt); |