summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/galera/r/galera_sp_bf_abort.result356
-rw-r--r--mysql-test/suite/galera/r/galera_sp_insert_parallel.result41
-rw-r--r--mysql-test/suite/galera/t/galera_sp_insert_parallel.test55
-rw-r--r--sql/sp_head.cc61
-rw-r--r--sql/sql_class.cc9
-rw-r--r--sql/wsrep_client_service.cc36
-rw-r--r--sql/wsrep_high_priority_service.cc83
-rw-r--r--sql/wsrep_high_priority_service.h3
-rw-r--r--sql/wsrep_trans_observer.h7
m---------wsrep-lib0
10 files changed, 578 insertions, 73 deletions
diff --git a/mysql-test/suite/galera/r/galera_sp_bf_abort.result b/mysql-test/suite/galera/r/galera_sp_bf_abort.result
new file mode 100644
index 00000000000..9216cc4fa5a
--- /dev/null
+++ b/mysql-test/suite/galera/r/galera_sp_bf_abort.result
@@ -0,0 +1,356 @@
+connection node_2;
+connection node_1;
+connection node_1;
+CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(1));
+connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1;
+connection node_1a;
+SET SESSION wsrep_sync_wait = 0;
+connection node_1;
+CREATE PROCEDURE proc_update_insert()
+BEGIN
+UPDATE t1 SET f2 = 'b';
+INSERT INTO t1 VALUES (4, 'd');
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_update_insert;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 b
+2 c
+3 b
+4 d
+wsrep_local_replays
+1
+DELETE FROM t1;
+connection node_1;
+CREATE PROCEDURE proc_update_insert_with_exit_handler()
+BEGIN
+DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
+UPDATE t1 SET f2 = 'b';
+INSERT INTO t1 VALUES (4, 'd');
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_update_insert_with_exit_handler;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 b
+2 c
+3 b
+4 d
+wsrep_local_replays
+1
+DELETE FROM t1;
+connection node_1;
+CREATE PROCEDURE proc_update_insert_with_continue_handler()
+BEGIN
+DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
+UPDATE t1 SET f2 = 'b';
+INSERT INTO t1 VALUES (4, 'd');
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_update_insert_with_continue_handler;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 b
+2 c
+3 b
+4 d
+wsrep_local_replays
+1
+DELETE FROM t1;
+connection node_1;
+CREATE PROCEDURE proc_update_insert_transaction()
+BEGIN
+START TRANSACTION;
+UPDATE t1 SET f2 = 'b';
+INSERT INTO t1 VALUES (4, 'd');
+COMMIT;
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_update_insert_transaction;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 b
+2 c
+3 b
+4 d
+wsrep_local_replays
+1
+DELETE FROM t1;
+connection node_1;
+CREATE PROCEDURE proc_update_insert_transaction_with_continue_handler()
+BEGIN
+DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
+START TRANSACTION;
+UPDATE t1 SET f2 = 'b';
+INSERT INTO t1 VALUES (4, 'd');
+COMMIT;
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_update_insert_transaction_with_continue_handler;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 b
+2 c
+3 b
+4 d
+wsrep_local_replays
+1
+DELETE FROM t1;
+connection node_1;
+CREATE PROCEDURE proc_update_insert_transaction_with_exit_handler()
+BEGIN
+DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
+START TRANSACTION;
+UPDATE t1 SET f2 = 'b';
+INSERT INTO t1 VALUES (4, 'd');
+COMMIT;
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_update_insert_transaction_with_exit_handler;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 b
+2 c
+3 b
+4 d
+wsrep_local_replays
+1
+DELETE FROM t1;
+connection node_1;
+CREATE PROCEDURE proc_insert_insert_conflict()
+BEGIN
+INSERT INTO t1 VALUES (2, 'd');
+INSERT INTO t1 VALUES (4, 'd');
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_insert_insert_conflict;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+Got one of the listed errors
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 a
+2 c
+3 a
+wsrep_local_replays
+1
+DELETE FROM t1;
+connection node_1;
+CREATE PROCEDURE proc_insert_insert_conflict_with_exit_handler()
+BEGIN
+DECLARE EXIT HANDLER FOR SQLEXCEPTION SELECT "Conflict exit handler";
+INSERT INTO t1 VALUES (2, 'd');
+INSERT INTO t1 VALUES (4, 'd');
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_insert_insert_conflict_with_exit_handler;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+Conflict exit handler
+Conflict exit handler
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 a
+2 c
+3 a
+wsrep_local_replays
+1
+DELETE FROM t1;
+connection node_1;
+CREATE PROCEDURE proc_insert_insert_conflict_with_continue_handler()
+BEGIN
+DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SELECT "Conflict continue handler";
+INSERT INTO t1 VALUES (2, 'd');
+INSERT INTO t1 VALUES (4, 'd');
+END|
+INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
+SET SESSION wsrep_sync_wait = 0;
+connection node_1a;
+SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
+connection node_2;
+INSERT INTO t1 VALUES (2, 'c');
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
+connection node_1;
+CALL proc_insert_insert_conflict_with_continue_handler;
+connection node_1a;
+SET SESSION wsrep_on = 0;
+SET SESSION wsrep_on = 1;
+SET GLOBAL wsrep_provider_options = 'dbug=';
+SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
+SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
+connection node_1;
+Conflict continue handler
+Conflict continue handler
+SET SESSION wsrep_sync_wait = default;
+SELECT * FROM t1;
+f1 f2
+1 a
+2 c
+3 a
+4 d
+wsrep_local_replays
+1
+DELETE FROM t1;
+DROP PROCEDURE proc_update_insert;
+DROP PROCEDURE proc_update_insert_with_continue_handler;
+DROP PROCEDURE proc_update_insert_with_exit_handler;
+DROP PROCEDURE proc_update_insert_transaction;
+DROP PROCEDURE proc_update_insert_transaction_with_continue_handler;
+DROP PROCEDURE proc_update_insert_transaction_with_exit_handler;
+DROP PROCEDURE proc_insert_insert_conflict;
+DROP PROCEDURE proc_insert_insert_conflict_with_exit_handler;
+DROP PROCEDURE proc_insert_insert_conflict_with_continue_handler;
+DROP TABLE t1;
diff --git a/mysql-test/suite/galera/r/galera_sp_insert_parallel.result b/mysql-test/suite/galera/r/galera_sp_insert_parallel.result
new file mode 100644
index 00000000000..3f072be7004
--- /dev/null
+++ b/mysql-test/suite/galera/r/galera_sp_insert_parallel.result
@@ -0,0 +1,41 @@
+connection node_2;
+connection node_1;
+CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 INTEGER) ENGINE=InnoDB;
+CREATE PROCEDURE proc_insert()
+BEGIN
+DECLARE i INT;
+DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
+SET i = 0;
+WHILE i < 1000 DO
+INSERT IGNORE INTO t1 (f1, f2)
+VALUES (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15));
+SET i = i + 1;
+END WHILE;
+END|
+connection node_1;
+SELECT 0;
+0
+0
+SET SESSION wsrep_sync_wait = 0;
+CALL proc_insert;
+connection node_2;
+SELECT 0;
+0
+0
+SET SESSION wsrep_sync_wait = 0;
+CALL proc_insert;
+connection node_1;
+SET SESSION wsrep_sync_wait = default;
+connection node_2;
+SET SESSION wsrep_sync_wait = default;
+connection node_1;
+DROP PROCEDURE proc_insert;
+DROP TABLE t1;
diff --git a/mysql-test/suite/galera/t/galera_sp_insert_parallel.test b/mysql-test/suite/galera/t/galera_sp_insert_parallel.test
new file mode 100644
index 00000000000..b6878a9c32a
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_sp_insert_parallel.test
@@ -0,0 +1,55 @@
+--source include/galera_cluster.inc
+--source include/have_innodb.inc
+
+CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 INTEGER) ENGINE=InnoDB;
+
+
+DELIMITER |;
+CREATE PROCEDURE proc_insert()
+BEGIN
+ DECLARE i INT;
+ DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
+
+ SET i = 0;
+ WHILE i < 1000 DO
+ INSERT IGNORE INTO t1 (f1, f2)
+ VALUES (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+ (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+ (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+ (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+ (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+ (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+ (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+ (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
+ (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15));
+ SET i = i + 1;
+
+ END WHILE;
+END|
+DELIMITER ;|
+
+--connection node_1
+SELECT 0;
+SET SESSION wsrep_sync_wait = 0;
+--send CALL proc_insert
+--connection node_2
+SELECT 0;
+SET SESSION wsrep_sync_wait = 0;
+--send CALL proc_insert
+
+--connection node_1
+--error 0,ER_LOCK_DEADLOCK,ER_QUERY_INTERRUPTED
+--disable_warnings
+--reap
+--enable_warnings
+SET SESSION wsrep_sync_wait = default;
+--connection node_2
+--error 0,ER_LOCK_DEADLOCK,ER_QUERY_INTERRUPTED
+--disable_warnings
+--reap
+--enable_warnings
+SET SESSION wsrep_sync_wait = default;
+
+--connection node_1
+DROP PROCEDURE proc_insert;
+DROP TABLE t1;
diff --git a/sql/sp_head.cc b/sql/sp_head.cc
index bd4d74c58a8..e98e5fbc27e 100644
--- a/sql/sp_head.cc
+++ b/sql/sp_head.cc
@@ -3605,32 +3605,45 @@ sp_instr_stmt::exec_core(THD *thd, uint *nextp)
3);
int res= mysql_execute_command(thd);
#ifdef WITH_WSREP
- if ((thd->is_fatal_error || thd->killed_errno()) &&
- (thd->wsrep_trx().state() == wsrep::transaction::s_executing))
+ if (WSREP(thd))
{
- /*
- SP was killed, and it is not due to a wsrep conflict.
- We skip after_statement hook at this point because
- otherwise it clears the error, and cleans up the
- whole transaction. For now we just return and finish
- our handling once we are back to mysql_parse.
- */
- WSREP_DEBUG("Skipping after_command hook for killed SP");
- }
- else
- {
- (void) wsrep_after_statement(thd);
- /*
- Final wsrep error status for statement is known only after
- wsrep_after_statement() call. If the error is set, override
- error in thd diagnostics area and reset wsrep client_state error
- so that the error does not get propagated via client-server protocol.
- */
- if (wsrep_current_error(thd))
+ if ((thd->is_fatal_error || thd->killed_errno()) &&
+ (thd->wsrep_trx().state() == wsrep::transaction::s_executing))
+ {
+ /*
+ SP was killed, and it is not due to a wsrep conflict.
+ We skip after_statement hook at this point because
+ otherwise it clears the error, and cleans up the
+ whole transaction. For now we just return and finish
+ our handling once we are back to mysql_parse.
+ */
+ WSREP_DEBUG("Skipping after_command hook for killed SP");
+ }
+ else
{
- wsrep_override_error(thd, wsrep_current_error(thd),
- wsrep_current_error_status(thd));
- thd->wsrep_cs().reset_error();
+ const bool must_replay= wsrep_must_replay(thd);
+ (void) wsrep_after_statement(thd);
+ /*
+ Reset the return code to zero if the transaction was
+ replayed succesfully.
+ */
+ if (res && must_replay && !wsrep_current_error(thd))
+ res= 0;
+ /*
+ Final wsrep error status for statement is known only after
+ wsrep_after_statement() call. If the error is set, override
+ error in thd diagnostics area and reset wsrep client_state error
+ so that the error does not get propagated via client-server protocol.
+ */
+ if (wsrep_current_error(thd))
+ {
+ wsrep_override_error(thd, wsrep_current_error(thd),
+ wsrep_current_error_status(thd));
+ thd->wsrep_cs().reset_error();
+ /* Reset also thd->killed if it has been set during BF abort. */
+ if (thd->killed == KILL_QUERY)
+ thd->reset_killed();
+ }
}
}
#endif /* WITH_WSREP */
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 3567e790f78..c0c89ee59b3 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1020,6 +1020,15 @@ Sql_condition* THD::raise_condition(uint sql_errno,
if (!(variables.option_bits & OPTION_SQL_NOTES) &&
(level == Sql_condition::WARN_LEVEL_NOTE))
DBUG_RETURN(NULL);
+#ifdef WITH_WSREP
+ /*
+ Suppress warnings/errors if the wsrep THD is going to replay. The
+ deadlock/interrupted errors may be transitient and should not be
+ reported to the client.
+ */
+ if (wsrep_must_replay(this))
+ DBUG_RETURN(NULL);
+#endif /* WITH_WSREP */
da->opt_clear_warning_info(query_id);
diff --git a/sql/wsrep_client_service.cc b/sql/wsrep_client_service.cc
index c042a1ea051..b182691c593 100644
--- a/sql/wsrep_client_service.cc
+++ b/sql/wsrep_client_service.cc
@@ -250,20 +250,38 @@ void Wsrep_client_service::will_replay()
enum wsrep::provider::status Wsrep_client_service::replay()
{
+
DBUG_ASSERT(m_thd == current_thd);
- Wsrep_replayer_service replayer_service(m_thd);
- wsrep::provider& provider(m_thd->wsrep_cs().provider());
- mysql_mutex_lock(&m_thd->LOCK_thd_data);
- m_thd->killed= NOT_KILLED;
- mysql_mutex_unlock(&m_thd->LOCK_thd_data);
- enum wsrep::provider::status ret=
- provider.replay(m_thd->wsrep_trx().ws_handle(), &replayer_service);
- replayer_service.replay_status(ret);
+ DBUG_ENTER("Wsrep_client_service::replay");
+
+ /*
+ Allocate separate THD for replaying to avoid tampering
+ original THD state during replication event applying.
+ */
+ THD *replayer_thd= new THD(true, true);
+ replayer_thd->thread_stack= m_thd->thread_stack;
+ replayer_thd->real_id= pthread_self();
+ replayer_thd->prior_thr_create_utime=
+ replayer_thd->start_utime= microsecond_interval_timer();
+ replayer_thd->set_command(COM_SLEEP);
+ replayer_thd->reset_for_next_command(true);
+
+ enum wsrep::provider::status ret;
+ {
+ Wsrep_replayer_service replayer_service(replayer_thd, m_thd);
+ wsrep::provider& provider(replayer_thd->wsrep_cs().provider());
+ ret= provider.replay(replayer_thd->wsrep_trx().ws_handle(),
+ &replayer_service);
+ replayer_service.replay_status(ret);
+ }
+
+ delete replayer_thd;
+
mysql_mutex_lock(&LOCK_wsrep_replaying);
--wsrep_replaying;
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
- return ret;
+ DBUG_RETURN(ret);
}
void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc
index bdacdc3b055..afb4ca3d3b7 100644
--- a/sql/wsrep_high_priority_service.cc
+++ b/sql/wsrep_high_priority_service.cc
@@ -519,83 +519,88 @@ bool Wsrep_applier_service::check_exit_status() const
Replayer service
*****************************************************************************/
-Wsrep_replayer_service::Wsrep_replayer_service(THD* thd)
- : Wsrep_high_priority_service(thd)
+Wsrep_replayer_service::Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd)
+ : Wsrep_high_priority_service(replayer_thd)
+ , m_orig_thd(orig_thd)
, m_da_shadow()
, m_replay_status()
{
/* Response must not have been sent to client */
- DBUG_ASSERT(!thd->get_stmt_da()->is_sent());
+ DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent());
/* PS reprepare observer should have been removed already
open_table() will fail if we have dangling observer here */
- DBUG_ASSERT(!thd->m_reprepare_observer);
+ DBUG_ASSERT(!orig_thd->m_reprepare_observer);
/* Replaying should happen always from after_statement() hook
after rollback, which should guarantee that there are no
transactional locks */
- DBUG_ASSERT(!thd->mdl_context.has_transactional_locks());
+ DBUG_ASSERT(!orig_thd->mdl_context.has_transactional_locks());
/* Make a shadow copy of diagnostics area and reset */
- m_da_shadow.status= thd->get_stmt_da()->status();
+ m_da_shadow.status= orig_thd->get_stmt_da()->status();
if (m_da_shadow.status == Diagnostics_area::DA_OK)
{
- m_da_shadow.affected_rows= thd->get_stmt_da()->affected_rows();
- m_da_shadow.last_insert_id= thd->get_stmt_da()->last_insert_id();
- strmake(m_da_shadow.message, thd->get_stmt_da()->message(),
+ m_da_shadow.affected_rows= orig_thd->get_stmt_da()->affected_rows();
+ m_da_shadow.last_insert_id= orig_thd->get_stmt_da()->last_insert_id();
+ strmake(m_da_shadow.message, orig_thd->get_stmt_da()->message(),
sizeof(m_da_shadow.message) - 1);
}
- thd->get_stmt_da()->reset_diagnostics_area();
+ orig_thd->get_stmt_da()->reset_diagnostics_area();
/* Release explicit locks */
- if (thd->locked_tables_mode && thd->lock)
+ if (orig_thd->locked_tables_mode && orig_thd->lock)
{
WSREP_WARN("releasing table lock for replaying (%llu)",
- thd->thread_id);
- thd->locked_tables_list.unlock_locked_tables(thd);
- thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
+ orig_thd->thread_id);
+ orig_thd->locked_tables_list.unlock_locked_tables(orig_thd);
+ orig_thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
}
+ thd_proc_info(orig_thd, "wsrep replaying trx");
+
/*
- Replaying will call MYSQL_START_STATEMENT when handling
- BEGIN Query_log_event so end statement must be called before
- replaying.
+ Swith execution context to replayer_thd and prepare it for
+ replay execution.
*/
- MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
- thd->m_statement_psi= NULL;
- thd->m_digest= NULL;
- thd_proc_info(thd, "wsrep replaying trx");
+ orig_thd->reset_globals();
+ replayer_thd->store_globals();
+ wsrep_open(replayer_thd);
+ wsrep_before_command(replayer_thd);
+ replayer_thd->wsrep_cs().clone_transaction_for_replay(orig_thd->wsrep_trx());
}
Wsrep_replayer_service::~Wsrep_replayer_service()
{
- THD* thd= m_thd;
- DBUG_ASSERT(!thd->get_stmt_da()->is_sent());
- DBUG_ASSERT(!thd->get_stmt_da()->is_set());
+ THD* replayer_thd= m_thd;
+ THD* orig_thd= m_orig_thd;
+
+ /* Store replay result/state to original thread wsrep client
+ state and switch execution context back to original. */
+ orig_thd->wsrep_cs().after_replay(replayer_thd->wsrep_trx());
+ wsrep_after_apply(replayer_thd);
+ wsrep_after_command_ignore_result(replayer_thd);
+ wsrep_close(replayer_thd);
+ replayer_thd->reset_globals();
+ orig_thd->store_globals();
+
+ DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent());
+ DBUG_ASSERT(!orig_thd->get_stmt_da()->is_set());
+
if (m_replay_status == wsrep::provider::success)
{
- DBUG_ASSERT(thd->wsrep_cs().current_error() == wsrep::e_success);
- thd->killed= NOT_KILLED;
- if (m_da_shadow.status == Diagnostics_area::DA_OK)
- {
- my_ok(thd,
- m_da_shadow.affected_rows,
- m_da_shadow.last_insert_id,
- m_da_shadow.message);
- }
- else
- {
- my_ok(thd);
- }
+ DBUG_ASSERT(replayer_thd->wsrep_cs().current_error() == wsrep::e_success);
+ orig_thd->killed= NOT_KILLED;
+ my_ok(orig_thd, m_da_shadow.affected_rows, m_da_shadow.last_insert_id);
}
else if (m_replay_status == wsrep::provider::error_certification_failed)
{
- wsrep_override_error(thd, ER_LOCK_DEADLOCK);
+ wsrep_override_error(orig_thd, ER_LOCK_DEADLOCK);
}
else
{
DBUG_ASSERT(0);
WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
m_replay_status,
- thd->db.str, WSREP_QUERY(thd));
+ orig_thd->db.str, WSREP_QUERY(orig_thd));
unireg_abort(1);
}
}
diff --git a/sql/wsrep_high_priority_service.h b/sql/wsrep_high_priority_service.h
index c483aa82d62..34fa1669b71 100644
--- a/sql/wsrep_high_priority_service.h
+++ b/sql/wsrep_high_priority_service.h
@@ -87,7 +87,7 @@ public:
class Wsrep_replayer_service : public Wsrep_high_priority_service
{
public:
- Wsrep_replayer_service(THD*);
+ Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd);
~Wsrep_replayer_service();
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&);
void after_apply() { }
@@ -99,6 +99,7 @@ public:
/* Replayer should never be forced to exit */
bool check_exit_status() const { return false; }
private:
+ THD* m_orig_thd;
struct da_shadow
{
enum Diagnostics_area::enum_diagnostics_status status;
diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h
index 467b33ea206..e6901f15ca7 100644
--- a/sql/wsrep_trans_observer.h
+++ b/sql/wsrep_trans_observer.h
@@ -56,6 +56,13 @@ static inline bool wsrep_must_abort(THD* thd)
}
/*
+ Return true if the transaction must be replayed.
+ */
+static inline bool wsrep_must_replay(THD* thd)
+{
+ return (thd->wsrep_trx().state() == wsrep::transaction::s_must_replay);
+}
+/*
Return true if transaction has not been committed.
Note that we don't require thd->LOCK_thd_data here. Calling this method
diff --git a/wsrep-lib b/wsrep-lib
-Subproject ae746fb28957140fb996a4aaf994baea58bd528
+Subproject e9dafb73734d71ab55078b34748e54f139aec82