diff options
-rw-r--r-- | mysql-test/suite/galera/r/galera_sp_bf_abort.result | 356 | ||||
-rw-r--r-- | mysql-test/suite/galera/r/galera_sp_insert_parallel.result | 41 | ||||
-rw-r--r-- | mysql-test/suite/galera/t/galera_sp_insert_parallel.test | 55 | ||||
-rw-r--r-- | sql/sp_head.cc | 61 | ||||
-rw-r--r-- | sql/sql_class.cc | 9 | ||||
-rw-r--r-- | sql/wsrep_client_service.cc | 36 | ||||
-rw-r--r-- | sql/wsrep_high_priority_service.cc | 83 | ||||
-rw-r--r-- | sql/wsrep_high_priority_service.h | 3 | ||||
-rw-r--r-- | sql/wsrep_trans_observer.h | 7 | ||||
m--------- | wsrep-lib | 0 |
10 files changed, 578 insertions, 73 deletions
diff --git a/mysql-test/suite/galera/r/galera_sp_bf_abort.result b/mysql-test/suite/galera/r/galera_sp_bf_abort.result new file mode 100644 index 00000000000..9216cc4fa5a --- /dev/null +++ b/mysql-test/suite/galera/r/galera_sp_bf_abort.result @@ -0,0 +1,356 @@ +connection node_2; +connection node_1; +connection node_1; +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(1)); +connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1; +connection node_1a; +SET SESSION wsrep_sync_wait = 0; +connection node_1; +CREATE PROCEDURE proc_update_insert() +BEGIN +UPDATE t1 SET f2 = 'b'; +INSERT INTO t1 VALUES (4, 'd'); +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_update_insert; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 b +2 c +3 b +4 d +wsrep_local_replays +1 +DELETE FROM t1; +connection node_1; +CREATE PROCEDURE proc_update_insert_with_exit_handler() +BEGIN +DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END; +UPDATE t1 SET f2 = 'b'; +INSERT INTO t1 VALUES (4, 'd'); +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_update_insert_with_exit_handler; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 b +2 c +3 b +4 d +wsrep_local_replays +1 +DELETE FROM t1; +connection node_1; +CREATE PROCEDURE proc_update_insert_with_continue_handler() +BEGIN +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END; +UPDATE t1 SET f2 = 'b'; +INSERT INTO t1 VALUES (4, 'd'); +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_update_insert_with_continue_handler; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 b +2 c +3 b +4 d +wsrep_local_replays +1 +DELETE FROM t1; +connection node_1; +CREATE PROCEDURE proc_update_insert_transaction() +BEGIN +START TRANSACTION; +UPDATE t1 SET f2 = 'b'; +INSERT INTO t1 VALUES (4, 'd'); +COMMIT; +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_update_insert_transaction; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 b +2 c +3 b +4 d +wsrep_local_replays +1 +DELETE FROM t1; +connection node_1; +CREATE PROCEDURE proc_update_insert_transaction_with_continue_handler() +BEGIN +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END; +START TRANSACTION; +UPDATE t1 SET f2 = 'b'; +INSERT INTO t1 VALUES (4, 'd'); +COMMIT; +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_update_insert_transaction_with_continue_handler; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 b +2 c +3 b +4 d +wsrep_local_replays +1 +DELETE FROM t1; +connection node_1; +CREATE PROCEDURE proc_update_insert_transaction_with_exit_handler() +BEGIN +DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END; +START TRANSACTION; +UPDATE t1 SET f2 = 'b'; +INSERT INTO t1 VALUES (4, 'd'); +COMMIT; +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_update_insert_transaction_with_exit_handler; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 b +2 c +3 b +4 d +wsrep_local_replays +1 +DELETE FROM t1; +connection node_1; +CREATE PROCEDURE proc_insert_insert_conflict() +BEGIN +INSERT INTO t1 VALUES (2, 'd'); +INSERT INTO t1 VALUES (4, 'd'); +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_insert_insert_conflict; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +Got one of the listed errors +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 a +2 c +3 a +wsrep_local_replays +1 +DELETE FROM t1; +connection node_1; +CREATE PROCEDURE proc_insert_insert_conflict_with_exit_handler() +BEGIN +DECLARE EXIT HANDLER FOR SQLEXCEPTION SELECT "Conflict exit handler"; +INSERT INTO t1 VALUES (2, 'd'); +INSERT INTO t1 VALUES (4, 'd'); +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_insert_insert_conflict_with_exit_handler; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +Conflict exit handler +Conflict exit handler +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 a +2 c +3 a +wsrep_local_replays +1 +DELETE FROM t1; +connection node_1; +CREATE PROCEDURE proc_insert_insert_conflict_with_continue_handler() +BEGIN +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SELECT "Conflict continue handler"; +INSERT INTO t1 VALUES (2, 'd'); +INSERT INTO t1 VALUES (4, 'd'); +END| +INSERT INTO t1 VALUES (1, 'a'), (3, 'a'); +SET SESSION wsrep_sync_wait = 0; +connection node_1a; +SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync'; +connection node_2; +INSERT INTO t1 VALUES (2, 'c'); +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync'; +connection node_1; +CALL proc_insert_insert_conflict_with_continue_handler; +connection node_1a; +SET SESSION wsrep_on = 0; +SET SESSION wsrep_on = 1; +SET GLOBAL wsrep_provider_options = 'dbug='; +SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync'; +SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync'; +connection node_1; +Conflict continue handler +Conflict continue handler +SET SESSION wsrep_sync_wait = default; +SELECT * FROM t1; +f1 f2 +1 a +2 c +3 a +4 d +wsrep_local_replays +1 +DELETE FROM t1; +DROP PROCEDURE proc_update_insert; +DROP PROCEDURE proc_update_insert_with_continue_handler; +DROP PROCEDURE proc_update_insert_with_exit_handler; +DROP PROCEDURE proc_update_insert_transaction; +DROP PROCEDURE proc_update_insert_transaction_with_continue_handler; +DROP PROCEDURE proc_update_insert_transaction_with_exit_handler; +DROP PROCEDURE proc_insert_insert_conflict; +DROP PROCEDURE proc_insert_insert_conflict_with_exit_handler; +DROP PROCEDURE proc_insert_insert_conflict_with_continue_handler; +DROP TABLE t1; diff --git a/mysql-test/suite/galera/r/galera_sp_insert_parallel.result b/mysql-test/suite/galera/r/galera_sp_insert_parallel.result new file mode 100644 index 00000000000..3f072be7004 --- /dev/null +++ b/mysql-test/suite/galera/r/galera_sp_insert_parallel.result @@ -0,0 +1,41 @@ +connection node_2; +connection node_1; +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 INTEGER) ENGINE=InnoDB; +CREATE PROCEDURE proc_insert() +BEGIN +DECLARE i INT; +DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END; +SET i = 0; +WHILE i < 1000 DO +INSERT IGNORE INTO t1 (f1, f2) +VALUES (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), +(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), +(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), +(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), +(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), +(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), +(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), +(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), +(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)); +SET i = i + 1; +END WHILE; +END| +connection node_1; +SELECT 0; +0 +0 +SET SESSION wsrep_sync_wait = 0; +CALL proc_insert; +connection node_2; +SELECT 0; +0 +0 +SET SESSION wsrep_sync_wait = 0; +CALL proc_insert; +connection node_1; +SET SESSION wsrep_sync_wait = default; +connection node_2; +SET SESSION wsrep_sync_wait = default; +connection node_1; +DROP PROCEDURE proc_insert; +DROP TABLE t1; diff --git a/mysql-test/suite/galera/t/galera_sp_insert_parallel.test b/mysql-test/suite/galera/t/galera_sp_insert_parallel.test new file mode 100644 index 00000000000..b6878a9c32a --- /dev/null +++ b/mysql-test/suite/galera/t/galera_sp_insert_parallel.test @@ -0,0 +1,55 @@ +--source include/galera_cluster.inc +--source include/have_innodb.inc + +CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 INTEGER) ENGINE=InnoDB; + + +DELIMITER |; +CREATE PROCEDURE proc_insert() +BEGIN + DECLARE i INT; + DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END; + + SET i = 0; + WHILE i < 1000 DO + INSERT IGNORE INTO t1 (f1, f2) + VALUES (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), + (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), + (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), + (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), + (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), + (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), + (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), + (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)), + (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)); + SET i = i + 1; + + END WHILE; +END| +DELIMITER ;| + +--connection node_1 +SELECT 0; +SET SESSION wsrep_sync_wait = 0; +--send CALL proc_insert +--connection node_2 +SELECT 0; +SET SESSION wsrep_sync_wait = 0; +--send CALL proc_insert + +--connection node_1 +--error 0,ER_LOCK_DEADLOCK,ER_QUERY_INTERRUPTED +--disable_warnings +--reap +--enable_warnings +SET SESSION wsrep_sync_wait = default; +--connection node_2 +--error 0,ER_LOCK_DEADLOCK,ER_QUERY_INTERRUPTED +--disable_warnings +--reap +--enable_warnings +SET SESSION wsrep_sync_wait = default; + +--connection node_1 +DROP PROCEDURE proc_insert; +DROP TABLE t1; diff --git a/sql/sp_head.cc b/sql/sp_head.cc index bd4d74c58a8..e98e5fbc27e 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -3605,32 +3605,45 @@ sp_instr_stmt::exec_core(THD *thd, uint *nextp) 3); int res= mysql_execute_command(thd); #ifdef WITH_WSREP - if ((thd->is_fatal_error || thd->killed_errno()) && - (thd->wsrep_trx().state() == wsrep::transaction::s_executing)) + if (WSREP(thd)) { - /* - SP was killed, and it is not due to a wsrep conflict. - We skip after_statement hook at this point because - otherwise it clears the error, and cleans up the - whole transaction. For now we just return and finish - our handling once we are back to mysql_parse. - */ - WSREP_DEBUG("Skipping after_command hook for killed SP"); - } - else - { - (void) wsrep_after_statement(thd); - /* - Final wsrep error status for statement is known only after - wsrep_after_statement() call. If the error is set, override - error in thd diagnostics area and reset wsrep client_state error - so that the error does not get propagated via client-server protocol. - */ - if (wsrep_current_error(thd)) + if ((thd->is_fatal_error || thd->killed_errno()) && + (thd->wsrep_trx().state() == wsrep::transaction::s_executing)) + { + /* + SP was killed, and it is not due to a wsrep conflict. + We skip after_statement hook at this point because + otherwise it clears the error, and cleans up the + whole transaction. For now we just return and finish + our handling once we are back to mysql_parse. + */ + WSREP_DEBUG("Skipping after_command hook for killed SP"); + } + else { - wsrep_override_error(thd, wsrep_current_error(thd), - wsrep_current_error_status(thd)); - thd->wsrep_cs().reset_error(); + const bool must_replay= wsrep_must_replay(thd); + (void) wsrep_after_statement(thd); + /* + Reset the return code to zero if the transaction was + replayed succesfully. + */ + if (res && must_replay && !wsrep_current_error(thd)) + res= 0; + /* + Final wsrep error status for statement is known only after + wsrep_after_statement() call. If the error is set, override + error in thd diagnostics area and reset wsrep client_state error + so that the error does not get propagated via client-server protocol. + */ + if (wsrep_current_error(thd)) + { + wsrep_override_error(thd, wsrep_current_error(thd), + wsrep_current_error_status(thd)); + thd->wsrep_cs().reset_error(); + /* Reset also thd->killed if it has been set during BF abort. */ + if (thd->killed == KILL_QUERY) + thd->reset_killed(); + } } } #endif /* WITH_WSREP */ diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 3567e790f78..c0c89ee59b3 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1020,6 +1020,15 @@ Sql_condition* THD::raise_condition(uint sql_errno, if (!(variables.option_bits & OPTION_SQL_NOTES) && (level == Sql_condition::WARN_LEVEL_NOTE)) DBUG_RETURN(NULL); +#ifdef WITH_WSREP + /* + Suppress warnings/errors if the wsrep THD is going to replay. The + deadlock/interrupted errors may be transitient and should not be + reported to the client. + */ + if (wsrep_must_replay(this)) + DBUG_RETURN(NULL); +#endif /* WITH_WSREP */ da->opt_clear_warning_info(query_id); diff --git a/sql/wsrep_client_service.cc b/sql/wsrep_client_service.cc index c042a1ea051..b182691c593 100644 --- a/sql/wsrep_client_service.cc +++ b/sql/wsrep_client_service.cc @@ -250,20 +250,38 @@ void Wsrep_client_service::will_replay() enum wsrep::provider::status Wsrep_client_service::replay() { + DBUG_ASSERT(m_thd == current_thd); - Wsrep_replayer_service replayer_service(m_thd); - wsrep::provider& provider(m_thd->wsrep_cs().provider()); - mysql_mutex_lock(&m_thd->LOCK_thd_data); - m_thd->killed= NOT_KILLED; - mysql_mutex_unlock(&m_thd->LOCK_thd_data); - enum wsrep::provider::status ret= - provider.replay(m_thd->wsrep_trx().ws_handle(), &replayer_service); - replayer_service.replay_status(ret); + DBUG_ENTER("Wsrep_client_service::replay"); + + /* + Allocate separate THD for replaying to avoid tampering + original THD state during replication event applying. + */ + THD *replayer_thd= new THD(true, true); + replayer_thd->thread_stack= m_thd->thread_stack; + replayer_thd->real_id= pthread_self(); + replayer_thd->prior_thr_create_utime= + replayer_thd->start_utime= microsecond_interval_timer(); + replayer_thd->set_command(COM_SLEEP); + replayer_thd->reset_for_next_command(true); + + enum wsrep::provider::status ret; + { + Wsrep_replayer_service replayer_service(replayer_thd, m_thd); + wsrep::provider& provider(replayer_thd->wsrep_cs().provider()); + ret= provider.replay(replayer_thd->wsrep_trx().ws_handle(), + &replayer_service); + replayer_service.replay_status(ret); + } + + delete replayer_thd; + mysql_mutex_lock(&LOCK_wsrep_replaying); --wsrep_replaying; mysql_cond_broadcast(&COND_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying); - return ret; + DBUG_RETURN(ret); } void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock) diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index bdacdc3b055..afb4ca3d3b7 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -519,83 +519,88 @@ bool Wsrep_applier_service::check_exit_status() const Replayer service *****************************************************************************/ -Wsrep_replayer_service::Wsrep_replayer_service(THD* thd) - : Wsrep_high_priority_service(thd) +Wsrep_replayer_service::Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd) + : Wsrep_high_priority_service(replayer_thd) + , m_orig_thd(orig_thd) , m_da_shadow() , m_replay_status() { /* Response must not have been sent to client */ - DBUG_ASSERT(!thd->get_stmt_da()->is_sent()); + DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent()); /* PS reprepare observer should have been removed already open_table() will fail if we have dangling observer here */ - DBUG_ASSERT(!thd->m_reprepare_observer); + DBUG_ASSERT(!orig_thd->m_reprepare_observer); /* Replaying should happen always from after_statement() hook after rollback, which should guarantee that there are no transactional locks */ - DBUG_ASSERT(!thd->mdl_context.has_transactional_locks()); + DBUG_ASSERT(!orig_thd->mdl_context.has_transactional_locks()); /* Make a shadow copy of diagnostics area and reset */ - m_da_shadow.status= thd->get_stmt_da()->status(); + m_da_shadow.status= orig_thd->get_stmt_da()->status(); if (m_da_shadow.status == Diagnostics_area::DA_OK) { - m_da_shadow.affected_rows= thd->get_stmt_da()->affected_rows(); - m_da_shadow.last_insert_id= thd->get_stmt_da()->last_insert_id(); - strmake(m_da_shadow.message, thd->get_stmt_da()->message(), + m_da_shadow.affected_rows= orig_thd->get_stmt_da()->affected_rows(); + m_da_shadow.last_insert_id= orig_thd->get_stmt_da()->last_insert_id(); + strmake(m_da_shadow.message, orig_thd->get_stmt_da()->message(), sizeof(m_da_shadow.message) - 1); } - thd->get_stmt_da()->reset_diagnostics_area(); + orig_thd->get_stmt_da()->reset_diagnostics_area(); /* Release explicit locks */ - if (thd->locked_tables_mode && thd->lock) + if (orig_thd->locked_tables_mode && orig_thd->lock) { WSREP_WARN("releasing table lock for replaying (%llu)", - thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + orig_thd->thread_id); + orig_thd->locked_tables_list.unlock_locked_tables(orig_thd); + orig_thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); } + thd_proc_info(orig_thd, "wsrep replaying trx"); + /* - Replaying will call MYSQL_START_STATEMENT when handling - BEGIN Query_log_event so end statement must be called before - replaying. + Swith execution context to replayer_thd and prepare it for + replay execution. */ - MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); - thd->m_statement_psi= NULL; - thd->m_digest= NULL; - thd_proc_info(thd, "wsrep replaying trx"); + orig_thd->reset_globals(); + replayer_thd->store_globals(); + wsrep_open(replayer_thd); + wsrep_before_command(replayer_thd); + replayer_thd->wsrep_cs().clone_transaction_for_replay(orig_thd->wsrep_trx()); } Wsrep_replayer_service::~Wsrep_replayer_service() { - THD* thd= m_thd; - DBUG_ASSERT(!thd->get_stmt_da()->is_sent()); - DBUG_ASSERT(!thd->get_stmt_da()->is_set()); + THD* replayer_thd= m_thd; + THD* orig_thd= m_orig_thd; + + /* Store replay result/state to original thread wsrep client + state and switch execution context back to original. */ + orig_thd->wsrep_cs().after_replay(replayer_thd->wsrep_trx()); + wsrep_after_apply(replayer_thd); + wsrep_after_command_ignore_result(replayer_thd); + wsrep_close(replayer_thd); + replayer_thd->reset_globals(); + orig_thd->store_globals(); + + DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent()); + DBUG_ASSERT(!orig_thd->get_stmt_da()->is_set()); + if (m_replay_status == wsrep::provider::success) { - DBUG_ASSERT(thd->wsrep_cs().current_error() == wsrep::e_success); - thd->killed= NOT_KILLED; - if (m_da_shadow.status == Diagnostics_area::DA_OK) - { - my_ok(thd, - m_da_shadow.affected_rows, - m_da_shadow.last_insert_id, - m_da_shadow.message); - } - else - { - my_ok(thd); - } + DBUG_ASSERT(replayer_thd->wsrep_cs().current_error() == wsrep::e_success); + orig_thd->killed= NOT_KILLED; + my_ok(orig_thd, m_da_shadow.affected_rows, m_da_shadow.last_insert_id); } else if (m_replay_status == wsrep::provider::error_certification_failed) { - wsrep_override_error(thd, ER_LOCK_DEADLOCK); + wsrep_override_error(orig_thd, ER_LOCK_DEADLOCK); } else { DBUG_ASSERT(0); WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s", m_replay_status, - thd->db.str, WSREP_QUERY(thd)); + orig_thd->db.str, WSREP_QUERY(orig_thd)); unireg_abort(1); } } diff --git a/sql/wsrep_high_priority_service.h b/sql/wsrep_high_priority_service.h index c483aa82d62..34fa1669b71 100644 --- a/sql/wsrep_high_priority_service.h +++ b/sql/wsrep_high_priority_service.h @@ -87,7 +87,7 @@ public: class Wsrep_replayer_service : public Wsrep_high_priority_service { public: - Wsrep_replayer_service(THD*); + Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd); ~Wsrep_replayer_service(); int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&); void after_apply() { } @@ -99,6 +99,7 @@ public: /* Replayer should never be forced to exit */ bool check_exit_status() const { return false; } private: + THD* m_orig_thd; struct da_shadow { enum Diagnostics_area::enum_diagnostics_status status; diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 467b33ea206..e6901f15ca7 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -56,6 +56,13 @@ static inline bool wsrep_must_abort(THD* thd) } /* + Return true if the transaction must be replayed. + */ +static inline bool wsrep_must_replay(THD* thd) +{ + return (thd->wsrep_trx().state() == wsrep::transaction::s_must_replay); +} +/* Return true if transaction has not been committed. Note that we don't require thd->LOCK_thd_data here. Calling this method diff --git a/wsrep-lib b/wsrep-lib -Subproject ae746fb28957140fb996a4aaf994baea58bd528 +Subproject e9dafb73734d71ab55078b34748e54f139aec82 |