summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/mysql/service_wsrep.h9
-rw-r--r--mysql-test/suite/galera/disabled.def1
-rw-r--r--mysql-test/suite/galera/r/galera_bf_kill.result49
-rw-r--r--mysql-test/suite/galera/r/galera_performance_schema.result3
-rw-r--r--mysql-test/suite/galera/r/galera_serializable.result8
-rw-r--r--mysql-test/suite/galera/t/MW-286.cnf10
-rw-r--r--mysql-test/suite/galera/t/MW-286.test2
-rw-r--r--mysql-test/suite/galera/t/galera_bf_kill.cnf10
-rw-r--r--mysql-test/suite/galera/t/galera_bf_kill.test93
-rw-r--r--mysql-test/suite/galera/t/galera_drop_database.cnf10
-rw-r--r--mysql-test/suite/galera/t/galera_serializable.test17
-rw-r--r--mysql-test/suite/wsrep/r/variables.result6
-rw-r--r--sql/sql_class.h2
-rw-r--r--sql/sql_plugin_services.ic4
-rw-r--r--sql/wsrep_dummy.cc6
-rw-r--r--sql/wsrep_mysqld.cc102
-rw-r--r--sql/wsrep_mysqld.h22
-rw-r--r--sql/wsrep_thd.cc355
-rw-r--r--sql/wsrep_thd.h1
-rw-r--r--sql/wsrep_var.cc1
-rw-r--r--storage/innobase/handler/ha_innodb.cc259
-rw-r--r--storage/innobase/include/ha_prototypes.h2
-rw-r--r--storage/innobase/lock/lock0lock.cc4
23 files changed, 719 insertions, 257 deletions
diff --git a/include/mysql/service_wsrep.h b/include/mysql/service_wsrep.h
index 923ba57fcdc..f398ccad699 100644
--- a/include/mysql/service_wsrep.h
+++ b/include/mysql/service_wsrep.h
@@ -70,6 +70,7 @@ struct xid_t;
struct wsrep;
struct wsrep_ws_handle;
struct wsrep_buf;
+typedef struct wsrep_kill wsrep_kill_t;
extern struct wsrep_service_st {
struct wsrep * (*get_wsrep_func)();
@@ -114,7 +115,9 @@ extern struct wsrep_service_st {
int (*wsrep_trx_order_before_func)(MYSQL_THD, MYSQL_THD);
void (*wsrep_unlock_rollback_func)();
void (*wsrep_set_data_home_dir_func)(const char *data_dir);
- my_bool (*wsrep_thd_is_applier_func)(MYSQL_THD);
+ my_bool (*wsrep_thd_is_applier_func)(MYSQL_THD thd);
+ bool (*wsrep_enqueue_background_kill_func)(wsrep_kill_t*);
+ wsrep_kill_t* (*wsrep_dequeue_background_kill_func)();
} *wsrep_service;
#ifdef MYSQL_DYNAMIC_PLUGIN
@@ -161,6 +164,8 @@ extern struct wsrep_service_st {
#define wsrep_unlock_rollback() wsrep_service->wsrep_unlock_rollback_func()
#define wsrep_set_data_home_dir(A) wsrep_service->wsrep_set_data_home_dir_func(A)
#define wsrep_thd_is_applier(T) wsrep_service->wsrep_thd_is_applier_func(T)
+#define wsrep_enqueue_background_kill(T) wsrep_service->wsrep_enqueue_background_kill_func(T);
+#define wsrep_dequeue_background_kill() wsrep_service->wsrep_dequeue_background_kill_func();
#define wsrep_debug get_wsrep_debug()
#define wsrep_log_conflicts get_wsrep_log_conflicts()
@@ -223,6 +228,8 @@ bool wsrep_thd_ignore_table(THD *thd);
void wsrep_unlock_rollback();
void wsrep_set_data_home_dir(const char *data_dir);
my_bool wsrep_thd_is_applier(MYSQL_THD thd);
+bool wsrep_enqueue_background_kill(wsrep_kill_t*);
+wsrep_kill_t* wsrep_dequeue_background_kill();
#endif
#ifdef __cplusplus
diff --git a/mysql-test/suite/galera/disabled.def b/mysql-test/suite/galera/disabled.def
index 27691c8f4b5..4a819acd6dd 100644
--- a/mysql-test/suite/galera/disabled.def
+++ b/mysql-test/suite/galera/disabled.def
@@ -10,7 +10,6 @@
#
##############################################################################
-MW-286 : MDEV-18464 Killing thread can cause mutex deadlock if done concurrently with Galera/replication victim kill
MW-328A : MDEV-21483 galera.MW-328A galera.MW-328B
MW-328B : MDEV-21483 galera.MW-328A galera.MW-328B
MW-329 : MDEV-19962 Galera test failure on MW-329
diff --git a/mysql-test/suite/galera/r/galera_bf_kill.result b/mysql-test/suite/galera/r/galera_bf_kill.result
new file mode 100644
index 00000000000..bd93b29b2db
--- /dev/null
+++ b/mysql-test/suite/galera/r/galera_bf_kill.result
@@ -0,0 +1,49 @@
+connection node_2;
+CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB;
+insert into t1 values (NULL,1);
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5;
+connection node_2;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5;
+connection node_2;
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5, b=2;
+connection node_2;
+ALTER TABLE t1 ADD UNIQUE KEY(b);
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2a;
+begin;
+update t1 set a =5, b=2;
+connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2;
+connection node_2b;
+begin;
+update t1 set a =6, b=7;
+connection node_2;
+ALTER TABLE t1 ADD UNIQUE KEY(b);
+Warnings:
+Note 1831 Duplicate index `b_2`. This is deprecated and will be disallowed in a future release
+select * from t1;
+a b
+2 1
+disconnect node_2a;
+disconnect node_2b;
+drop table t1;
diff --git a/mysql-test/suite/galera/r/galera_performance_schema.result b/mysql-test/suite/galera/r/galera_performance_schema.result
index 5b4994556d6..22b72572759 100644
--- a/mysql-test/suite/galera/r/galera_performance_schema.result
+++ b/mysql-test/suite/galera/r/galera_performance_schema.result
@@ -4,6 +4,7 @@ FROM threads
WHERE name LIKE 'thread/sql/wsrep%'
ORDER BY name;
name thread/sql/wsrep_applier_thread
+name thread/sql/wsrep_killer_thread
name thread/sql/wsrep_rollbacker_thread
use test;
create table t1 (a int not null primary key) engine=innodb;
@@ -12,6 +13,7 @@ use performance_schema;
select name from mutex_instances where name like 'wait/synch/mutex/sql/LOCK_wsrep%' order by name;
name wait/synch/mutex/sql/LOCK_wsrep_config_state
name wait/synch/mutex/sql/LOCK_wsrep_desync
+name wait/synch/mutex/sql/LOCK_wsrep_kill
name wait/synch/mutex/sql/LOCK_wsrep_ready
name wait/synch/mutex/sql/LOCK_wsrep_replaying
name wait/synch/mutex/sql/LOCK_wsrep_rollback
@@ -19,6 +21,7 @@ name wait/synch/mutex/sql/LOCK_wsrep_slave_threads
name wait/synch/mutex/sql/LOCK_wsrep_sst
name wait/synch/mutex/sql/LOCK_wsrep_sst_init
select name from cond_instances where name like 'wait/synch/cond/sql/COND_wsrep%' order by name;
+name wait/synch/cond/sql/COND_wsrep_kill
name wait/synch/cond/sql/COND_wsrep_ready
name wait/synch/cond/sql/COND_wsrep_replaying
name wait/synch/cond/sql/COND_wsrep_rollback
diff --git a/mysql-test/suite/galera/r/galera_serializable.result b/mysql-test/suite/galera/r/galera_serializable.result
index be3f93a081f..9280578b58a 100644
--- a/mysql-test/suite/galera/r/galera_serializable.result
+++ b/mysql-test/suite/galera/r/galera_serializable.result
@@ -27,11 +27,17 @@ ROLLBACK;
DELETE FROM t1;
connection node_1;
START TRANSACTION;
-connection node_1;
INSERT INTO t1 VALUES (1,1);
connection node_2;
INSERT INTO t1 VALUES (1,2);
connection node_1;
COMMIT;
ERROR 40001: Deadlock: wsrep aborted transaction
+SELECT * from t1;
+id f2
+1 2
+connection node_2;
+SELECT * from t1;
+id f2
+1 2
DROP TABLE t1;
diff --git a/mysql-test/suite/galera/t/MW-286.cnf b/mysql-test/suite/galera/t/MW-286.cnf
new file mode 100644
index 00000000000..8ec4c2333cb
--- /dev/null
+++ b/mysql-test/suite/galera/t/MW-286.cnf
@@ -0,0 +1,10 @@
+!include ../galera_2nodes.cnf
+
+[mysqld.1]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
+[mysqld.2]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
diff --git a/mysql-test/suite/galera/t/MW-286.test b/mysql-test/suite/galera/t/MW-286.test
index 426b4493bb7..fcfbf4c62c3 100644
--- a/mysql-test/suite/galera/t/MW-286.test
+++ b/mysql-test/suite/galera/t/MW-286.test
@@ -3,8 +3,6 @@
#
--source include/galera_cluster.inc
---source include/have_innodb.inc
---source include/big_test.inc
--connection node_1
CREATE TABLE ten (f1 INTEGER) Engine=InnoDB;
diff --git a/mysql-test/suite/galera/t/galera_bf_kill.cnf b/mysql-test/suite/galera/t/galera_bf_kill.cnf
new file mode 100644
index 00000000000..8ec4c2333cb
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_bf_kill.cnf
@@ -0,0 +1,10 @@
+!include ../galera_2nodes.cnf
+
+[mysqld.1]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
+[mysqld.2]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
diff --git a/mysql-test/suite/galera/t/galera_bf_kill.test b/mysql-test/suite/galera/t/galera_bf_kill.test
new file mode 100644
index 00000000000..dce6808fd56
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_bf_kill.test
@@ -0,0 +1,93 @@
+--source include/galera_cluster.inc
+
+#
+# Test case 1: Start a transaction on node_2a and kill it
+# from other connection on same node
+#
+
+--connection node_2
+CREATE TABLE t1(a int not null primary key auto_increment,b int) engine=InnoDB;
+insert into t1 values (NULL,1);
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5;
+
+--connection node_2
+
+#show create table information_schema.processlist;
+#select ID,USER,COMMAND,STATE,QUERY_ID from INFORMATION_SCHEMA.PROCESSLIST;
+
+--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1`
+
+--disable_query_log
+--eval KILL $k_thread
+--enable_query_log
+
+select * from t1;
+--disconnect node_2a
+
+#
+# Test case 2: Start a transaction on node_2a and use
+# kill query from other connection on same node
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5;
+
+--connection node_2
+--let $k_thread = `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE USER = 'root' AND COMMAND = 'Sleep' LIMIT 1`
+
+--disable_query_log
+--eval KILL QUERY $k_thread
+--enable_query_log
+
+select * from t1;
+--disconnect node_2a
+#
+# Test case 3: Start a transaction on node_2a and start a DDL on other transaction
+# that will then abort node_2a transaction
+#
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5, b=2;
+
+--connection node_2
+ALTER TABLE t1 ADD UNIQUE KEY(b);
+
+select * from t1;
+
+--disconnect node_2a
+
+#
+# Test case 4: Start a transaction on node_2a and conflicting transaction on node_2b
+# and start a DDL on other transaction that will then abort node_2a and node_2b
+# transactions
+#
+
+--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2a
+begin;
+update t1 set a =5, b=2;
+
+--connect node_2b, 127.0.0.1, root, , test, $NODE_MYPORT_2
+--connection node_2b
+begin;
+send update t1 set a =6, b=7;
+
+--connection node_2
+ALTER TABLE t1 ADD UNIQUE KEY(b);
+
+select * from t1;
+
+--disconnect node_2a
+--disconnect node_2b
+
+drop table t1;
+
+
+
diff --git a/mysql-test/suite/galera/t/galera_drop_database.cnf b/mysql-test/suite/galera/t/galera_drop_database.cnf
new file mode 100644
index 00000000000..8ec4c2333cb
--- /dev/null
+++ b/mysql-test/suite/galera/t/galera_drop_database.cnf
@@ -0,0 +1,10 @@
+!include ../galera_2nodes.cnf
+
+[mysqld.1]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
+[mysqld.2]
+wsrep-debug=ON
+wsrep-log-conflicts=ON
+
diff --git a/mysql-test/suite/galera/t/galera_serializable.test b/mysql-test/suite/galera/t/galera_serializable.test
index b12d57fd488..28f6a7aa62d 100644
--- a/mysql-test/suite/galera/t/galera_serializable.test
+++ b/mysql-test/suite/galera/t/galera_serializable.test
@@ -7,7 +7,6 @@
#
--source include/galera_cluster.inc
---source include/have_innodb.inc
--connection node_1
@@ -26,7 +25,6 @@ SELECT * FROM t1;
--connection node_2
INSERT INTO t1 VALUES (1,1);
---sleep 2
--connection node_1
--error ER_LOCK_DEADLOCK
SELECT * FROM t1;
@@ -46,7 +44,6 @@ SELECT * FROM t1;
--connection node_2
UPDATE t1 SET f2 = 2;
---sleep 2
--connection node_1
--error ER_LOCK_DEADLOCK
UPDATE t1 SET f2 = 3;
@@ -62,8 +59,6 @@ DELETE FROM t1;
--connection node_1
START TRANSACTION;
-
---connection node_1
INSERT INTO t1 VALUES (1,1);
--connection node_2
@@ -73,4 +68,16 @@ INSERT INTO t1 VALUES (1,2);
--error ER_LOCK_DEADLOCK
COMMIT;
+--let $wait_condition = SELECT COUNT(*) = 1 FROM t1;
+--source include/wait_condition.inc
+
+SELECT * from t1;
+
+--connection node_2
+
+--let $wait_condition = SELECT COUNT(*) = 1 FROM t1;
+--source include/wait_condition.inc
+
+SELECT * from t1;
+
DROP TABLE t1;
diff --git a/mysql-test/suite/wsrep/r/variables.result b/mysql-test/suite/wsrep/r/variables.result
index 3abc861f3d0..96dcee47206 100644
--- a/mysql-test/suite/wsrep/r/variables.result
+++ b/mysql-test/suite/wsrep/r/variables.result
@@ -149,7 +149,7 @@ VARIABLE_VALUE
1
SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_thread_count';
VARIABLE_VALUE
-2
+3
SELECT @@global.wsrep_provider;
@@global.wsrep_provider
libgalera_smm.so
@@ -164,7 +164,7 @@ Variable_name Value
Threads_connected 1
SHOW STATUS LIKE 'wsrep_thread_count';
Variable_name Value
-wsrep_thread_count 2
+wsrep_thread_count 3
SET @wsrep_slave_threads_saved= @@global.wsrep_slave_threads;
SET GLOBAL wsrep_slave_threads= 10;
@@ -177,7 +177,7 @@ VARIABLE_VALUE
1
SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_thread_count';
VARIABLE_VALUE
-11
+12
SHOW STATUS LIKE 'threads_connected';
Variable_name Value
Threads_connected 1
diff --git a/sql/sql_class.h b/sql/sql_class.h
index b35f9a93238..95f08acf403 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4409,6 +4409,8 @@ public:
#ifdef WITH_WSREP
const bool wsrep_applier; /* dedicated slave applier thread */
+ bool wsrep_killer; /* dedicated background
+ kill thread */
bool wsrep_applier_closing; /* applier marked to close */
bool wsrep_client_thread; /* to identify client threads*/
bool wsrep_PA_safe;
diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic
index 20113444b64..7cae8a79efd 100644
--- a/sql/sql_plugin_services.ic
+++ b/sql/sql_plugin_services.ic
@@ -184,7 +184,9 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_trx_order_before,
wsrep_unlock_rollback,
wsrep_set_data_home_dir,
- wsrep_thd_is_applier
+ wsrep_thd_is_applier,
+ wsrep_enqueue_background_kill,
+ wsrep_dequeue_background_kill
};
static struct thd_specifics_service_st thd_specifics_handler=
diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc
index b98dd1e5790..a0e720e5dd0 100644
--- a/sql/wsrep_dummy.cc
+++ b/sql/wsrep_dummy.cc
@@ -150,3 +150,9 @@ void wsrep_set_data_home_dir(const char *)
my_bool wsrep_thd_is_applier(MYSQL_THD thd)
{ return false; }
+
+bool wsrep_enqueue_background_kill(wsrep_kill_t* item)
+{ return false;}
+
+wsrep_kill_t* wsrep_dequeue_background_kill()
+{ return NULL;}
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index ba6c2d24f77..54dda0a1caf 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -38,7 +38,8 @@
#include <cstdlib>
#include "log_event.h"
#include "sql_plugin.h" /* wsrep_plugins_pre_init() */
-#include <vector>
+#include <list>
+#include <algorithm>
wsrep_t *wsrep = NULL;
/*
@@ -133,6 +134,8 @@ mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
+mysql_mutex_t LOCK_wsrep_kill;
+mysql_cond_t COND_wsrep_kill;
int wsrep_replaying= 0;
ulong wsrep_running_threads = 0; // # of currently running wsrep
@@ -140,6 +143,7 @@ ulong wsrep_running_threads = 0; // # of currently running wsrep
ulong wsrep_running_applier_threads = 0; // # of running applier threads
ulong wsrep_running_rollbacker_threads = 0; // # of running
// # rollbacker threads
+ulong wsrep_running_killer_threads = 0;
ulong my_bind_addr;
#ifdef HAVE_PSI_INTERFACE
@@ -147,11 +151,13 @@ PSI_mutex_key key_LOCK_wsrep_rollback,
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
- key_LOCK_wsrep_config_state;
+ key_LOCK_wsrep_config_state,
+ key_LOCK_wsrep_kill;
PSI_cond_key key_COND_wsrep_rollback,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
- key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
+ key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
+ key_COND_wsrep_kill;
PSI_file_key key_file_wsrep_gra_log;
@@ -166,7 +172,8 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
- { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
+ { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_kill, "LOCK_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_cond_info wsrep_conds[]=
@@ -176,7 +183,8 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
- { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
+ { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_kill, "COND_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_file_info wsrep_files[]=
@@ -185,14 +193,15 @@ static PSI_file_info wsrep_files[]=
};
PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
- key_wsrep_rollbacker, key_wsrep_applier;
+ key_wsrep_rollbacker, key_wsrep_applier, key_wsrep_killer;
static PSI_thread_info wsrep_threads[]=
{
{&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
- {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}
+ {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL},
+ {&key_wsrep_killer, "wsrep_killer_thread", PSI_FLAG_GLOBAL}
};
#endif /* HAVE_PSI_INTERFACE */
@@ -239,6 +248,7 @@ wsp::Config_state *wsrep_config_state;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
+std::list< wsrep_kill_t* > wsrep_kill_list;
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
@@ -820,6 +830,8 @@ void wsrep_thr_init()
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_kill, &LOCK_wsrep_kill, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_kill, &COND_wsrep_kill, NULL);
DBUG_VOID_RETURN;
}
@@ -856,6 +868,7 @@ void wsrep_init_startup (bool first)
if (!wsrep_start_replication()) unireg_abort(1);
wsrep_create_rollbacker();
+ wsrep_create_killer();
wsrep_create_appliers(1);
if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
@@ -897,6 +910,8 @@ void wsrep_thr_deinit()
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_desync);
mysql_mutex_destroy(&LOCK_wsrep_config_state);
+ mysql_mutex_destroy(&LOCK_wsrep_kill);
+ mysql_cond_destroy(&COND_wsrep_kill);
delete wsrep_config_state;
wsrep_config_state= 0; // Safety
}
@@ -1648,7 +1663,7 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
{
- WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
+ WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd));
return 1;
}
@@ -2144,9 +2159,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
case WSREP_ROLLBACKER_THREAD:
wsrep_running_rollbacker_threads++;
break;
+ case WSREP_KILLER_THREAD:
+ wsrep_running_killer_threads++;
+ thd->wsrep_killer= true;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
mysql_cond_broadcast(&COND_thread_count);
@@ -2169,9 +2188,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
wsrep_running_rollbacker_threads--;
break;
+ case WSREP_KILLER_THREAD:
+ DBUG_ASSERT(wsrep_running_killer_threads > 0);
+ wsrep_running_killer_threads--;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
my_free(args);
@@ -2418,7 +2441,11 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
}
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
- WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for rollbacker threads to close: %lu", wsrep_running_rollbacker_threads);
+ WSREP_DEBUG("Waiting for applier threads to close: %lu", wsrep_running_applier_threads);
+ WSREP_DEBUG("Waiting for killer threads to close: %lu", wsrep_running_killer_threads);
+ WSREP_DEBUG("Waiting for wsrep threads to close: %lu", wsrep_running_threads);
while (wait_to_end && have_client_connections())
{
@@ -2452,7 +2479,7 @@ void wsrep_close_threads(THD *thd)
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
- if (tmp->wsrep_applier && tmp != thd)
+ if ((tmp->wsrep_applier || tmp->wsrep_killer) && tmp != thd)
{
WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
wsrep_close_thread (tmp);
@@ -2466,7 +2493,7 @@ void wsrep_wait_appliers_close(THD *thd)
{
/* Wait for wsrep appliers to gracefully exit */
mysql_mutex_lock(&LOCK_thread_count);
- while (wsrep_running_threads > 1)
+ while (wsrep_running_threads > 2)
// 1 is for rollbacker thread which needs to be killed explicitly.
// This gotta be fixed in a more elegant manner if we gonna have arbitrary
// number of non-applier wsrep threads.
@@ -2995,3 +3022,52 @@ bool wsrep_node_is_synced()
{
return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
}
+
+bool wsrep_enqueue_background_kill(wsrep_kill_t *item)
+{
+ std::list< wsrep_kill_t* >::iterator it;
+ bool inserted= false;
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ for (it = wsrep_kill_list.begin(); it != wsrep_kill_list.end(); it++)
+ {
+ if ((*it)->victim_thd == item->victim_thd)
+ break;
+ }
+
+ if(it != wsrep_kill_list.end())
+ {
+ WSREP_DEBUG("Thread: %lld query: %s already on kill list",
+ item->victim_thd->thread_id, wsrep_thd_query(item->victim_thd));
+ }
+ else
+ {
+ wsrep_kill_list.push_back(item);
+ mysql_cond_signal(&COND_wsrep_kill);
+ inserted= true;
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ return inserted;
+}
+
+wsrep_kill_t* wsrep_dequeue_background_kill()
+{
+ wsrep_kill_t* item= NULL;
+
+ mysql_mutex_assert_owner(&LOCK_wsrep_kill);
+
+ if (!wsrep_kill_list.empty())
+ {
+ item= wsrep_kill_list.front();
+ wsrep_kill_list.pop_front();
+ }
+
+ return item;
+}
+
+bool wsrep_kill_empty()
+{
+ return wsrep_kill_list.empty();
+}
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 96b3e63914b..8258bc32f6b 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -56,6 +56,15 @@ struct wsrep_thd_shadow {
longlong row_count_func;
};
+typedef struct wsrep_kill {
+ THD* victim_thd;
+ THD* bf_thd;
+ uint64_t victim_id;
+ uint64_t bf_id;
+ bool signal;
+ bool wait_lock;
+} wsrep_kill_t;
+
// Global wsrep parameters
extern wsrep_t* wsrep;
@@ -92,6 +101,7 @@ extern my_bool wsrep_slave_UK_checks;
extern ulong wsrep_running_threads;
extern ulong wsrep_running_applier_threads;
extern ulong wsrep_running_rollbacker_threads;
+extern ulong wsrep_running_killer_threads;
extern bool wsrep_new_cluster;
extern bool wsrep_gtid_mode;
extern uint32 wsrep_gtid_domain_id;
@@ -234,8 +244,6 @@ extern wsrep_seqno_t wsrep_locked_seqno;
#define WSREP_PROVIDER_EXISTS \
(wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN))
-#define WSREP_QUERY(thd) (thd->query())
-
extern my_bool wsrep_ready_get();
extern void wsrep_ready_wait();
@@ -265,6 +273,8 @@ extern mysql_cond_t COND_wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_desync;
extern mysql_mutex_t LOCK_wsrep_config_state;
+extern mysql_mutex_t LOCK_wsrep_kill;
+extern mysql_cond_t COND_wsrep_kill;
extern wsrep_aborting_thd_t wsrep_aborting_thd;
extern my_bool wsrep_emulate_bin_log;
extern int wsrep_to_isolation;
@@ -289,6 +299,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying;
extern PSI_cond_key key_COND_wsrep_replaying;
extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
extern PSI_mutex_key key_LOCK_wsrep_desync;
+extern PSI_mutex_key key_LOCK_wsrep_kill;
+extern PSI_cond_key key_COND_wsrep_kill;
extern PSI_file_key key_file_wsrep_gra_log;
@@ -296,6 +308,7 @@ extern PSI_thread_key key_wsrep_sst_joiner;
extern PSI_thread_key key_wsrep_sst_donor;
extern PSI_thread_key key_wsrep_rollbacker;
extern PSI_thread_key key_wsrep_applier;
+extern PSI_thread_key key_wsrep_killer;
#endif /* HAVE_PSI_INTERFACE */
@@ -322,7 +335,8 @@ void thd_binlog_trx_reset(THD * thd);
enum wsrep_thread_type {
WSREP_APPLIER_THREAD=1,
- WSREP_ROLLBACKER_THREAD=2
+ WSREP_ROLLBACKER_THREAD=2,
+ WSREP_KILLER_THREAD=3
};
typedef void (*wsrep_thd_processor_fun)(THD *);
@@ -369,6 +383,8 @@ void wsrep_keys_free(wsrep_key_arr_t* key_arr);
((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \
wsrep_forced_binlog_format : my_format)
+bool wsrep_kill_empty();
+
#else /* WITH_WSREP */
#define WSREP(T) (0)
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index a7b6e8ff1b1..cff92cd3484 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -27,6 +27,7 @@
#include "rpl_filter.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
+#include "debug_sync.h"
#if (__LP64__)
static volatile int64 wsrep_bf_aborts_counter(0);
@@ -424,12 +425,25 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
mysql_mutex_lock(&LOCK_thread_count);
ulong old_wsrep_running_threads= wsrep_running_threads;
+ PSI_thread_key key;
- DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD ||
- args->thread_type == WSREP_ROLLBACKER_THREAD);
+ switch(args->thread_type)
+ {
+ case WSREP_APPLIER_THREAD:
+ key = key_wsrep_applier;
+ break;
+ case WSREP_ROLLBACKER_THREAD:
+ key = key_wsrep_rollbacker;
+ break;
+ case WSREP_KILLER_THREAD:
+ key = key_wsrep_killer;
+ break;
+ default:
+ WSREP_ERROR("Incorrect thread type %d", args->thread_type);
+ assert(0);
+ }
- bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD
- ? key_wsrep_applier : key_wsrep_rollbacker,
+ bool res= mysql_thread_create(key,
&args->thread_id, &connection_attrib,
start_wsrep_THD, (void*)args);
@@ -457,6 +471,335 @@ static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
return res;
}
+static void wsrep_abort_slave_trx(long long bf_seqno, long long victim_seqno)
+{
+ WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
+ "caused by:\n\t"
+ "1) unsupported configuration options combination, please check documentation.\n\t"
+ "2) a bug in the code.\n\t"
+ "3) a database corruption.\n Node consistency compromized, "
+ "need to abort. Restart the node to resync with cluster.",
+ bf_seqno, victim_seqno);
+ abort();
+}
+
+static int wsrep_kill(wsrep_kill_t* item)
+{
+ bool signal= item->signal;
+ THD* thd= item->victim_thd;
+ THD* bf_thd= item->bf_thd;
+ unsigned long long victim_id= static_cast<unsigned long long>(item->victim_id);
+ unsigned long long bf_id= static_cast<unsigned long long>(item->bf_id);
+ unsigned long victim_thread= thd_get_thread_id(thd);
+ unsigned long bf_thread= thd_get_thread_id(bf_thd);
+ long long bf_seqno= wsrep_thd_trx_seqno(bf_thd);
+ long long victim_seqno= wsrep_thd_trx_seqno(thd);
+
+ wsrep_thd_LOCK(thd);
+
+ WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
+
+ WSREP_DEBUG("Aborter %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s query: %s",
+ wsrep_thd_is_BF(bf_thd, false) ? "BF" : "normal",
+ bf_id,
+ bf_thread,
+ bf_seqno,
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_query(bf_thd));
+
+ WSREP_DEBUG("Victim %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conflict_state: %s query: %s",
+ wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
+ victim_id,
+ victim_thread,
+ victim_seqno,
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_query(thd));
+
+ DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock",
+ {
+ const char act[]=
+ "now "
+ "wait_for signal.wsrep_after_BF_victim_lock";
+ DBUG_ASSERT(!debug_sync_set_action(bf_thd,
+ STRING_WITH_LEN(act)));
+ };);
+
+ if (wsrep_thd_query_state(thd) == QUERY_EXITING)
+ {
+ WSREP_DEBUG("Victim query state QUERY_EXITING trx: %llu"
+ " thread: %lu",
+ victim_id,
+ victim_thread);
+ wsrep_thd_UNLOCK(thd);
+ return(0);
+ }
+
+ if (wsrep_thd_exec_mode(thd) != LOCAL_STATE)
+ {
+ WSREP_DEBUG("Victim withdraw of non local for BF trx: %llu "
+ ", thread: %lu exec_mode: %s",
+ victim_id,
+ victim_thread,
+ wsrep_thd_exec_mode_str(thd));
+ }
+
+ switch (wsrep_thd_get_conflict_state(thd))
+ {
+ case NO_CONFLICT:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in NO_CONFLICT state",
+ victim_thread,
+ victim_id);
+ wsrep_thd_set_conflict_state(thd, MUST_ABORT);
+ break;
+ case MUST_ABORT:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in MUST_ABORT state",
+ victim_thread,
+ victim_id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ return(0);
+ break;
+ case ABORTED:
+ case ABORTING: // fall through
+ default:
+ WSREP_DEBUG("Victim thread: %lu trx: %llu in state: %s",
+ victim_thread,
+ victim_id,
+ wsrep_thd_conflict_state_str(thd));
+ wsrep_thd_UNLOCK(thd);
+ return(0);
+ break;
+ }
+
+ switch (wsrep_thd_query_state(thd))
+ {
+ case QUERY_COMMITTING:
+ {
+ enum wsrep_status rcode=WSREP_OK;
+
+ WSREP_DEBUG("Victim kill trx QUERY_COMMITTING state thread: %ld trx: %llu",
+ victim_thread,
+ victim_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave thread: %ld trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ }
+ else
+ {
+ wsrep_t *wsrep= get_wsrep();
+
+ rcode= wsrep->abort_pre_commit(wsrep, bf_seqno,
+ (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id);
+
+ switch (rcode)
+ {
+ case WSREP_WARNING:
+ {
+ WSREP_DEBUG("Victim cancel commit warning thread: %lu trx: %llu",
+ victim_thread,
+ victim_id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ return(1);
+ break;
+ }
+ case WSREP_OK:
+ break;
+ default:
+ {
+ WSREP_ERROR("Victim cancel commit bad commit exit thread: "
+ "%lu trx: %llu rcode: %d ",
+ victim_thread,
+ victim_id,
+ rcode);
+ /* unable to interrupt, must abort */
+ /* note: kill_mysql() will block, if we cannot.
+ * kill the lock holder first. */
+ abort();
+ break;
+ }
+ }
+ }
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ break;
+ }
+ case QUERY_EXEC:
+ {
+ /* it is possible that victim trx is itself waiting for some
+ * other lock. We need to cancel this waiting */
+ WSREP_DEBUG("Victim kill trx QUERY_EXEC state thread: %ld trx: %llu",
+ victim_thread, victim_id);
+
+ bool wait_lock= item->wait_lock;
+
+ if (wait_lock)
+ {
+ WSREP_DEBUG("Victim thread: %lu trx: %llu has lock wait flag",
+ victim_thread,
+ victim_id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+ }
+ else
+ {
+ /* Abort currently executing query */
+ WSREP_DEBUG("Kill query for victim thread: %lu trx: %llu",
+ victim_thread,
+ victim_id);
+ wsrep_thd_UNLOCK(thd);
+ wsrep_thd_awake(thd, signal);
+
+ /* for BF thd, we need to prevent him from committing */
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV abort slave for thread: "
+ "%lu trx: %llu"
+ " bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ }
+ }
+ break;
+ }
+ case QUERY_IDLE:
+ {
+ WSREP_DEBUG("Victim kill trx QUERY_IDLE state thread: %ld trx: %llu",
+ victim_thread,
+ victim_id);
+
+ if (wsrep_thd_exec_mode(thd) == REPL_RECV)
+ {
+ WSREP_DEBUG("Victim REPL_RECV kill BF IDLE, thread: %ld trx: "
+ "%llu bf_seqno: %lld victim_seqno: %lld",
+ victim_thread,
+ victim_id,
+ bf_seqno,
+ victim_seqno);
+
+ wsrep_thd_UNLOCK(thd);
+ wsrep_abort_slave_trx(bf_seqno, victim_seqno);
+ return(0);
+ }
+
+ /* This will lock thd from proceeding after net_read() */
+ wsrep_thd_set_conflict_state(thd, ABORTING);
+
+ wsrep_lock_rollback();
+
+ if (wsrep_aborting_thd_contains(thd))
+ {
+ WSREP_WARN("Victim is duplicate thd aborter thread: %ld trx: %llu",
+ victim_thread,
+ victim_id);
+ }
+ else
+ {
+ wsrep_aborting_thd_enqueue(thd);
+ WSREP_DEBUG("Enqueuing victim thread: %ld trx: %llu for abort",
+ victim_thread,
+ victim_id);
+ }
+
+ wsrep_unlock_rollback();
+ wsrep_thd_UNLOCK(thd);
+
+ break;
+ }
+ default:
+ {
+ WSREP_WARN("Victim thread: %ld trx: %llu in bad wsrep query state: %s",
+ victim_thread,
+ victim_id,
+ wsrep_thd_query_state_str(thd));
+ wsrep_thd_UNLOCK(thd);
+ break;
+ }
+ }
+
+ return(0);
+}
+
+static void wsrep_process_kill(THD *thd)
+{
+ DBUG_ENTER("wsrep_process_kill");
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ WSREP_DEBUG("WSREP killer thread started");
+
+ while (thd->killed == NOT_KILLED)
+ {
+ wsrep_kill_t* to_be_killed;
+
+ thd_proc_info(thd, "wsrep killer idle");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_kill;
+ thd->mysys_var->current_cond= &COND_wsrep_kill;
+
+ mysql_cond_wait(&COND_wsrep_kill,&LOCK_wsrep_kill);
+
+ WSREP_DEBUG("WSREP killer thread wakes for signal");
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep killer active");
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ /* process all entries in the queue */
+ while ((to_be_killed= wsrep_dequeue_background_kill()))
+ {
+ /* Here we do not have InnoDB lock sys mutex or trx mutex
+ so it is safe to take LOCK_thd_data. */
+ wsrep_kill(to_be_killed);
+ my_free(to_be_killed);
+ to_be_killed= NULL;
+ }
+ }
+
+ assert(wsrep_kill_empty());
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ sql_print_information("WSREP: killer thread exiting");
+ DBUG_PRINT("wsrep",("wsrep killer thread exiting"));
+ DBUG_VOID_RETURN;
+}
+
+
+void wsrep_create_killer()
+{
+ wsrep_thread_args* arg;
+ if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
+ WSREP_ERROR("Can't allocate memory for wsrep background killer thread");
+ assert(0);
+ }
+
+ arg->thread_type = WSREP_KILLER_THREAD;
+ arg->processor = wsrep_process_kill;
+
+ if (create_wsrep_THD(arg, false)) {
+ WSREP_WARN("Can't create thread to manage wsrep background kill");
+ my_free(arg);
+ return;
+ }
+}
+
bool wsrep_create_appliers(long threads, bool thread_count_lock)
{
if (!wsrep_connected)
@@ -552,7 +895,7 @@ static void wsrep_rollback_process(THD *thd)
mysql_mutex_unlock(&aborting->LOCK_thd_data);
- set_current_thd(aborting);
+ set_current_thd(aborting);
aborting->store_globals();
mysql_mutex_lock(&aborting->LOCK_thd_data);
@@ -562,7 +905,7 @@ static void wsrep_rollback_process(THD *thd)
(longlong) aborting->real_id);
mysql_mutex_unlock(&aborting->LOCK_thd_data);
- set_current_thd(thd);
+ set_current_thd(thd);
thd->store_globals();
mysql_mutex_lock(&LOCK_wsrep_rollback);
diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h
index 8d928014518..40ce4b847e6 100644
--- a/sql/wsrep_thd.h
+++ b/sql/wsrep_thd.h
@@ -28,6 +28,7 @@ void wsrep_client_rollback(THD *thd);
void wsrep_replay_transaction(THD *thd);
bool wsrep_create_appliers(long threads, bool thread_count_lock=false);
void wsrep_create_rollbacker();
+void wsrep_create_killer();
int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
my_bool signal);
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index 9777cc6ec62..8b2b7c13cdc 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -500,6 +500,7 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type)
if (wsrep_start_replication())
{
wsrep_create_rollbacker();
+ wsrep_create_killer();
WSREP_DEBUG("Cluster address update creating %ld applier threads running %lu",
wsrep_slave_threads, wsrep_running_applier_threads);
wsrep_create_appliers(wsrep_slave_threads);
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 0338e2b682a..659ce188641 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -5238,8 +5238,6 @@ static void innobase_kill_query(handlerton*, THD* thd, enum thd_kill_levels)
/* if victim has been signaled by BF thread and/or aborting
is already progressing, following query aborting is not necessary
any more.
- Also, BF thread should own trx mutex for the victim, which would
- conflict with trx_mutex_enter() below
*/
DBUG_VOID_RETURN;
}
@@ -19611,41 +19609,28 @@ static struct st_mysql_storage_engine innobase_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
#ifdef WITH_WSREP
-void
-wsrep_abort_slave_trx(
-/*==================*/
- wsrep_seqno_t bf_seqno,
- wsrep_seqno_t victim_seqno)
-{
- WSREP_ERROR("Trx %lld tries to abort slave trx %lld. This could be "
- "caused by:\n\t"
- "1) unsupported configuration options combination, please check documentation.\n\t"
- "2) a bug in the code.\n\t"
- "3) a database corruption.\n Node consistency compromized, "
- "need to abort. Restart the node to resync with cluster.",
- (long long)bf_seqno, (long long)victim_seqno);
- abort();
-}
-/*******************************************************************//**
-This function is used to kill one transaction in BF. */
+/** This function is used to kill one transaction in BF.
+@param[in] bf_thd_ptr BF THD
+@param[in] bf_trx BF transaction
+@param[in] victim_trx Victim to be killed
+@param[in] signal signal victim? */
UNIV_INTERN
int
wsrep_innobase_kill_one_trx(
-/*========================*/
- void * const bf_thd_ptr,
- const trx_t * const bf_trx,
- trx_t *victim_trx,
- ibool signal)
+ void* const bf_thd_ptr,
+ const trx_t* const bf_trx,
+ trx_t* victim_trx,
+ bool signal)
{
ut_ad(lock_mutex_own());
ut_ad(trx_mutex_own(victim_trx));
- ut_ad(bf_thd_ptr);
- ut_ad(victim_trx);
+ ut_a(bf_thd_ptr);
+ ut_a(victim_trx);
DBUG_ENTER("wsrep_innobase_kill_one_trx");
- THD *bf_thd = bf_thd_ptr ? (THD*) bf_thd_ptr : NULL;
+
+ THD *bf_thd = (THD *) bf_thd_ptr;
THD *thd = (THD *) victim_trx->mysql_thd;
- int64_t bf_seqno = (bf_thd) ? wsrep_thd_trx_seqno(bf_thd) : 0;
if (!thd) {
DBUG_PRINT("wsrep", ("no thd for conflicting lock"));
@@ -19653,198 +19638,31 @@ wsrep_innobase_kill_one_trx(
DBUG_RETURN(1);
}
- if (!bf_thd) {
- DBUG_PRINT("wsrep", ("no BF thd for conflicting lock"));
- WSREP_WARN("no BF THD for trx: " TRX_ID_FMT,
- bf_trx ? bf_trx->id : 0);
- DBUG_RETURN(1);
- }
-
- WSREP_LOG_CONFLICT(bf_thd, thd, TRUE);
-
- WSREP_DEBUG("BF kill (" ULINTPF ", seqno: " INT64PF
- "), victim: (%lu) trx: " TRX_ID_FMT,
- signal, bf_seqno,
- thd_get_thread_id(thd),
- victim_trx->id);
-
- WSREP_DEBUG("Aborting query: %s conf %d trx: %" PRId64,
- (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void",
- wsrep_thd_conflict_state(thd, FALSE),
- wsrep_thd_ws_handle(thd)->trx_id);
+ /* Actual processing of the victim kill is handled later
+ on background thread. At this point we may not hold
+ LOCK_thd_data mutex as we are already holding lock sys
+ and trx mutex. */
+ wsrep_kill_t* item = static_cast<wsrep_kill_t*>
+ (my_malloc(sizeof(wsrep_kill_t), MYF(0)));
- wsrep_thd_LOCK(thd);
- DBUG_EXECUTE_IF("sync.wsrep_after_BF_victim_lock",
- {
- const char act[]=
- "now "
- "wait_for signal.wsrep_after_BF_victim_lock";
- DBUG_ASSERT(!debug_sync_set_action(bf_thd,
- STRING_WITH_LEN(act)));
- };);
-
-
- if (wsrep_thd_query_state(thd) == QUERY_EXITING) {
- WSREP_DEBUG("kill trx EXITING for " TRX_ID_FMT,
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
- DBUG_RETURN(0);
+ if (!item) {
+ WSREP_ERROR("Failed to allocate memory to kill thread.");
+ DBUG_RETURN(1);
}
- if (wsrep_thd_exec_mode(thd) != LOCAL_STATE) {
- WSREP_DEBUG("withdraw for BF trx: " TRX_ID_FMT ", state: %d",
- victim_trx->id,
- wsrep_thd_get_conflict_state(thd));
- }
+ item->victim_thd = thd;
+ item->victim_id = victim_trx->id;
+ item->bf_thd = bf_thd;
+ item->bf_id = bf_trx ? bf_trx->id : TRX_ID_MAX;
+ item->signal = signal;
+ item->wait_lock = (victim_trx->lock.wait_lock ? true : false);
- switch (wsrep_thd_get_conflict_state(thd)) {
- case NO_CONFLICT:
- wsrep_thd_set_conflict_state(thd, MUST_ABORT);
- break;
- case MUST_ABORT:
- WSREP_DEBUG("victim " TRX_ID_FMT " in MUST ABORT state",
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
- DBUG_RETURN(0);
- break;
- case ABORTED:
- case ABORTING: // fall through
- default:
- WSREP_DEBUG("victim " TRX_ID_FMT " in state %d",
- victim_trx->id, wsrep_thd_get_conflict_state(thd));
- wsrep_thd_UNLOCK(thd);
- DBUG_RETURN(0);
- break;
+ /* If victim itself is waiting a lock, cancel wait lock. */
+ if (victim_trx->lock.wait_lock) {
+ lock_cancel_waiting_and_release(victim_trx->lock.wait_lock);
}
- switch (wsrep_thd_query_state(thd)) {
- case QUERY_COMMITTING:
- enum wsrep_status rcode;
-
- WSREP_DEBUG("kill query for: %ld",
- thd_get_thread_id(thd));
- WSREP_DEBUG("kill trx QUERY_COMMITTING for " TRX_ID_FMT,
- victim_trx->id);
-
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
- } else {
- wsrep_t *wsrep= get_wsrep();
- rcode = wsrep->abort_pre_commit(
- wsrep, bf_seqno,
- (wsrep_trx_id_t)wsrep_thd_ws_handle(thd)->trx_id
- );
-
- switch (rcode) {
- case WSREP_WARNING:
- WSREP_DEBUG("cancel commit warning: "
- TRX_ID_FMT,
- victim_trx->id);
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
- DBUG_RETURN(1);
- break;
- case WSREP_OK:
- break;
- default:
- WSREP_ERROR(
- "cancel commit bad exit: %d "
- TRX_ID_FMT,
- rcode, victim_trx->id);
- /* unable to interrupt, must abort */
- /* note: kill_mysql() will block, if we cannot.
- * kill the lock holder first.
- */
- abort();
- break;
- }
- }
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
- break;
- case QUERY_EXEC:
- /* it is possible that victim trx is itself waiting for some
- * other lock. We need to cancel this waiting
- */
- WSREP_DEBUG("kill trx QUERY_EXEC for " TRX_ID_FMT,
- victim_trx->id);
-
- victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
-
- if (victim_trx->lock.wait_lock) {
- WSREP_DEBUG("victim has wait flag: %ld",
- thd_get_thread_id(thd));
- lock_t* wait_lock = victim_trx->lock.wait_lock;
-
- if (wait_lock) {
- WSREP_DEBUG("canceling wait lock");
- victim_trx->lock.was_chosen_as_deadlock_victim= TRUE;
- lock_cancel_waiting_and_release(wait_lock);
- }
-
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
- } else {
- /* abort currently executing query */
- DBUG_PRINT("wsrep",("sending KILL_QUERY to: %lu",
- thd_get_thread_id(thd)));
- WSREP_DEBUG("kill query for: %ld",
- thd_get_thread_id(thd));
- /* Note that innobase_kill_query will take lock_mutex
- and trx_mutex */
- wsrep_thd_UNLOCK(thd);
- wsrep_thd_awake(thd, signal);
-
- /* for BF thd, we need to prevent him from committing */
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
- }
- }
- break;
- case QUERY_IDLE:
- {
- WSREP_DEBUG("kill IDLE for " TRX_ID_FMT, victim_trx->id);
-
- if (wsrep_thd_exec_mode(thd) == REPL_RECV) {
- WSREP_DEBUG("kill BF IDLE, seqno: %lld",
- (long long)wsrep_thd_trx_seqno(thd));
- wsrep_thd_UNLOCK(thd);
- wsrep_abort_slave_trx(bf_seqno,
- wsrep_thd_trx_seqno(thd));
- DBUG_RETURN(0);
- }
- /* This will lock thd from proceeding after net_read() */
- wsrep_thd_set_conflict_state(thd, ABORTING);
-
- wsrep_lock_rollback();
-
- if (wsrep_aborting_thd_contains(thd)) {
- WSREP_WARN("duplicate thd aborter %lu",
- (ulong) thd_get_thread_id(thd));
- } else {
- wsrep_aborting_thd_enqueue(thd);
- DBUG_PRINT("wsrep",("enqueuing trx abort for %lu",
- thd_get_thread_id(thd)));
- WSREP_DEBUG("enqueuing trx abort for (%lu)",
- thd_get_thread_id(thd));
- }
-
- DBUG_PRINT("wsrep",("signalling wsrep rollbacker"));
- WSREP_DEBUG("signaling aborter");
- wsrep_unlock_rollback();
- wsrep_thd_UNLOCK(thd);
-
- break;
- }
- default:
- WSREP_WARN("bad wsrep query state: %d",
- wsrep_thd_query_state(thd));
- wsrep_thd_UNLOCK(thd);
- break;
- }
+ wsrep_enqueue_background_kill(item);
DBUG_RETURN(0);
}
@@ -19860,10 +19678,13 @@ wsrep_abort_transaction(
{
DBUG_ENTER("wsrep_innobase_abort_thd");
+ ut_a(bf_thd);
+ ut_a(victim_thd);
+
trx_t* victim_trx = thd_to_trx(victim_thd);
- trx_t* bf_trx = (bf_thd) ? thd_to_trx(bf_thd) : NULL;
+ trx_t* bf_trx = thd_to_trx(bf_thd);
- WSREP_DEBUG("abort transaction: BF: %s victim: %s victim conf: %d",
+ WSREP_DEBUG("Abort transaction: BF: %s victim: %s victim conf: %d",
wsrep_thd_query(bf_thd),
wsrep_thd_query(victim_thd),
wsrep_thd_conflict_state(victim_thd, FALSE));
@@ -19871,8 +19692,10 @@ wsrep_abort_transaction(
if (victim_trx) {
lock_mutex_enter();
trx_mutex_enter(victim_trx);
- int rcode = wsrep_innobase_kill_one_trx(bf_thd, bf_trx,
- victim_trx, signal);
+ int rcode = wsrep_innobase_kill_one_trx(bf_thd,
+ bf_trx,
+ victim_trx,
+ static_cast<bool>(signal));
lock_mutex_exit();
trx_mutex_exit(victim_trx);
wsrep_srv_conc_cancel_wait(victim_trx);
diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h
index 693dcd15163..27d1137e808 100644
--- a/storage/innobase/include/ha_prototypes.h
+++ b/storage/innobase/include/ha_prototypes.h
@@ -238,7 +238,7 @@ int
wsrep_innobase_kill_one_trx(void * const thd_ptr,
const trx_t * const bf_trx,
trx_t *victim_trx,
- ibool signal);
+ bool signal);
int wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
unsigned char* str, unsigned int str_length,
unsigned int buf_length);
diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc
index 006ecc28276..408f2e0c16a 100644
--- a/storage/innobase/lock/lock0lock.cc
+++ b/storage/innobase/lock/lock0lock.cc
@@ -1,7 +1,7 @@
/*****************************************************************************
Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
-Copyright (c) 2014, 2019, MariaDB Corporation.
+Copyright (c) 2014, 2020, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -1150,7 +1150,7 @@ wsrep_kill_victim(
}
wsrep_innobase_kill_one_trx(trx->mysql_thd,
- trx, lock->trx, TRUE);
+ trx, lock->trx, true);
}
}
}