summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/galera_sr/disabled.def1
-rw-r--r--mysql-test/suite/galera_sr/r/galera_sr_ddl_master.result2
-rw-r--r--mysql-test/suite/galera_sr/t/galera_sr_ddl_master.test7
-rw-r--r--sql/wsrep_schema.cc20
-rw-r--r--sql/wsrep_server_service.cc29
-rw-r--r--sql/wsrep_server_service.h10
-rw-r--r--sql/wsrep_thd.cc111
7 files changed, 62 insertions, 118 deletions
diff --git a/mysql-test/suite/galera_sr/disabled.def b/mysql-test/suite/galera_sr/disabled.def
index 44354706143..ab22c746cd0 100644
--- a/mysql-test/suite/galera_sr/disabled.def
+++ b/mysql-test/suite/galera_sr/disabled.def
@@ -1,4 +1,3 @@
galera_sr_table_contents : missing file
GCF-437 : test relies on InnoDB redo log size limitation
-galera_sr_ddl_master : MDEV-20780 Galera test failure on galera_sr.galera_sr_ddl_master
GCF-1043A : MDEV-21170 Galera test failure on galera_sr.GCF-1043A
diff --git a/mysql-test/suite/galera_sr/r/galera_sr_ddl_master.result b/mysql-test/suite/galera_sr/r/galera_sr_ddl_master.result
index 93f94222862..5858a9c6eb8 100644
--- a/mysql-test/suite/galera_sr/r/galera_sr_ddl_master.result
+++ b/mysql-test/suite/galera_sr/r/galera_sr_ddl_master.result
@@ -48,8 +48,6 @@ SELECT COUNT(*) as expect_0 FROM mysql.wsrep_streaming_log;
expect_0
0
connection node_2;
-SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-set global wsrep_sync_wait=15;
SELECT COUNT(*) as expect_6 FROM t1;
expect_6
6
diff --git a/mysql-test/suite/galera_sr/t/galera_sr_ddl_master.test b/mysql-test/suite/galera_sr/t/galera_sr_ddl_master.test
index bf1105e908d..3c42cb2a0a2 100644
--- a/mysql-test/suite/galera_sr/t/galera_sr_ddl_master.test
+++ b/mysql-test/suite/galera_sr/t/galera_sr_ddl_master.test
@@ -59,15 +59,8 @@ SELECT * FROM t1;
SELECT COUNT(*) as expect_0 FROM mysql.wsrep_streaming_log;
--connection node_2
-SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-set global wsrep_sync_wait=15;
---let $wait_condition = SELECT COUNT(*) = 6 FROM t1;
---source include/wait_condition.inc
SELECT COUNT(*) as expect_6 FROM t1;
SELECT * FROM t1;
---let $wait_condition = SELECT COUNT(*) = 0 FROM mysql.wsrep_streaming_log;
---source include/wait_condition.inc
-
SELECT COUNT(*) as expect_0 FROM mysql.wsrep_streaming_log;
DROP TABLE t1;
diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc
index 066ea124fb7..619a535f916 100644
--- a/sql/wsrep_schema.cc
+++ b/sql/wsrep_schema.cc
@@ -1049,37 +1049,23 @@ int Wsrep_schema::remove_fragments(THD* thd,
Wsrep_schema_impl::wsrep_off wsrep_off(thd);
Wsrep_schema_impl::binlog_off binlog_off(thd);
- /*
- Open SR table for write.
- Adopted from Rpl_info_table_access::open_table()
- */
- uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
- MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
- MYSQL_OPEN_IGNORE_FLUSH |
- MYSQL_LOCK_IGNORE_TIMEOUT);
Query_tables_list query_tables_list_backup;
Open_tables_backup open_tables_backup;
thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
thd->reset_n_backup_open_tables_state(&open_tables_backup);
- TABLE_LIST tables;
- LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
- LEX_CSTRING table_str= { sr_table_str.c_str(), sr_table_str.length() };
- tables.init_one_table(&schema_str,
- &table_str, 0, TL_WRITE);
- if (!open_n_lock_single_table(thd, &tables, tables.lock_type, flags))
+ TABLE* frag_table= 0;
+ if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
{
- WSREP_DEBUG("Failed to open SR table for access");
ret= 1;
}
else
{
- tables.table->use_all_columns();
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
i != fragments.end(); ++i)
{
if (remove_fragment(thd,
- tables.table,
+ frag_table,
server_id,
transaction_id, *i))
{
diff --git a/sql/wsrep_server_service.cc b/sql/wsrep_server_service.cc
index bfb85e3d0ab..aa288e67420 100644
--- a/sql/wsrep_server_service.cc
+++ b/sql/wsrep_server_service.cc
@@ -42,15 +42,13 @@ static void init_service_thd(THD* thd, char* thread_stack)
thd->reset_for_next_command(true);
}
-wsrep::storage_service* Wsrep_server_service::storage_service(
- wsrep::client_service& client_service)
+Wsrep_storage_service*
+wsrep_create_storage_service(THD* orig_THD, const char* ctx)
{
- Wsrep_client_service& cs=
- static_cast<Wsrep_client_service&>(client_service);
- THD* thd= new THD(next_thread_id(), true);
- init_service_thd(thd, cs.m_thd->thread_stack);
- WSREP_DEBUG("Created storage service with thread id %llu",
- thd->thread_id);
+ THD* thd= new THD(true, true);
+ init_service_thd(thd, orig_THD->thread_stack);
+ WSREP_DEBUG("Created storage service in %s context with thread id %llu",
+ ctx, thd->thread_id);
/* Use variables from the current thd attached to client_service.
This is because we need to be able to BF abort storage access
operations. */
@@ -59,16 +57,19 @@ wsrep::storage_service* Wsrep_server_service::storage_service(
}
wsrep::storage_service* Wsrep_server_service::storage_service(
+ wsrep::client_service& client_service)
+{
+ Wsrep_client_service& cs=
+ static_cast<Wsrep_client_service&>(client_service);
+ return wsrep_create_storage_service(cs.m_thd, "local");
+}
+
+wsrep::storage_service* Wsrep_server_service::storage_service(
wsrep::high_priority_service& high_priority_service)
{
Wsrep_high_priority_service& hps=
static_cast<Wsrep_high_priority_service&>(high_priority_service);
- THD* thd= new THD(next_thread_id(), true);
- init_service_thd(thd, hps.m_thd->thread_stack);
- WSREP_DEBUG("Created high priority storage service with thread id %llu",
- thd->thread_id);
- wsrep_assign_from_threadvars(thd);
- return new Wsrep_storage_service(thd);
+ return wsrep_create_storage_service(hps.m_thd, "high priority");
}
void Wsrep_server_service::release_storage_service(
diff --git a/sql/wsrep_server_service.h b/sql/wsrep_server_service.h
index 6336fe2c473..4017c9b2d58 100644
--- a/sql/wsrep_server_service.h
+++ b/sql/wsrep_server_service.h
@@ -87,4 +87,14 @@ class Wsrep_applier_service;
Wsrep_applier_service*
wsrep_create_streaming_applier(THD *orig_thd, const char *ctx);
+/**
+ Helper method to create new storage service.
+
+ @param orig_thd Original thd context to copy operation context from.
+ @param ctx Context string for debug logging.
+*/
+class Wsrep_storage_service;
+Wsrep_storage_service*
+wsrep_create_storage_service(THD *orig_thd, const char *ctx);
+
#endif /* WSREP_SERVER_SERVICE */
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index 8b535b41a43..7f1818def73 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -136,99 +136,60 @@ void wsrep_create_appliers(long threads)
}
}
-static void wsrep_rollback_streaming_aborted_by_toi(THD *thd)
+static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx)
{
- WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi");
- /* Set thd->event_scheduler.data temporarily to NULL to avoid
- callbacks to threadpool wait_begin() during rollback. */
- auto saved_esd= thd->event_scheduler.data;
- thd->event_scheduler.data= 0;
- if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
- {
- DBUG_ASSERT(!saved_esd);
- DBUG_ASSERT(thd->wsrep_applier_service);
- thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
- wsrep::ws_meta());
- thd->wsrep_applier_service->after_apply();
- /* Will free THD */
- Wsrep_server_state::instance().server_service().
- release_high_priority_service(thd->wsrep_applier_service);
- }
- else
- {
- mysql_mutex_lock(&thd->LOCK_thd_data);
- /* prepare THD for rollback processing */
- thd->reset_for_next_command(true);
- thd->lex->sql_command= SQLCOM_ROLLBACK;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- /* Perform a client rollback, restore globals and signal
- the victim only when all the resources have been
- released */
- thd->wsrep_cs().client_service().bf_rollback();
- wsrep_reset_threadvars(thd);
- /* Assign saved event_scheduler.data back before letting
- client to continue. */
- thd->event_scheduler.data= saved_esd;
- thd->wsrep_cs().sync_rollback_complete();
- }
+ wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx);
+ storage_service->store_globals();
+ storage_service->adopt_transaction(thd->wsrep_trx());
+ storage_service->remove_fragments();
+ storage_service->commit(wsrep::ws_handle(transaction_id, 0),
+ wsrep::ws_meta());
+ Wsrep_server_state::instance().server_service()
+ .release_storage_service(storage_service);
+ wsrep_store_threadvars(thd);
}
-static void wsrep_rollback_high_priority(THD *thd)
+static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker)
{
- WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)",
- thd->thread_id, (long long)thd->real_id);
+ WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)",
+ thd->thread_id, thd->real_id);
+ char* orig_thread_stack= thd->thread_stack;
+ thd->thread_stack= rollbacker->thread_stack;
DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
/* Must be streaming and must have been removed from the
server state streaming appliers map. */
DBUG_ASSERT(thd->wsrep_trx().is_streaming());
DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
- thd->wsrep_trx().server_id(),
- thd->wsrep_trx().id()));
+ thd->wsrep_trx().server_id(),
+ thd->wsrep_trx().id()));
DBUG_ASSERT(thd->wsrep_applier_service);
/* Fragment removal should happen before rollback to make
the transaction non-observable in SR table after the rollback
completes. For correctness the order does not matter here,
but currently it is mandated by checks in some MTR tests. */
- wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
- Wsrep_storage_service* storage_service=
- static_cast<Wsrep_storage_service*>(
- Wsrep_server_state::instance().server_service().storage_service(
- *thd->wsrep_applier_service));
- storage_service->store_globals();
- storage_service->adopt_transaction(thd->wsrep_trx());
- storage_service->remove_fragments();
- storage_service->commit(wsrep::ws_handle(transaction_id, 0),
- wsrep::ws_meta());
- Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
- wsrep_store_threadvars(thd);
+ wsrep_remove_streaming_fragments(thd, "high priority");
thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
wsrep::ws_meta());
thd->wsrep_applier_service->after_apply();
+ thd->thread_stack= orig_thread_stack;
+ WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
+ thd->thread_id, thd->real_id);
/* Will free THD */
Wsrep_server_state::instance().server_service()
.release_high_priority_service(thd->wsrep_applier_service);
}
-static void wsrep_rollback_local(THD *thd)
+static void wsrep_rollback_local(THD *thd, THD *rollbacker)
{
- WSREP_INFO("Wsrep_rollback_local");
+ WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)",
+ thd->thread_id, thd->real_id);
+ char* orig_thread_stack= thd->thread_stack;
+ thd->thread_stack= rollbacker->thread_stack;
if (thd->wsrep_trx().is_streaming())
{
- wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
- Wsrep_storage_service* storage_service=
- static_cast<Wsrep_storage_service*>(
- Wsrep_server_state::instance().server_service().
- storage_service(thd->wsrep_cs().client_service()));
-
- storage_service->store_globals();
- storage_service->adopt_transaction(thd->wsrep_trx());
- storage_service->remove_fragments();
- storage_service->commit(wsrep::ws_handle(transaction_id, 0),
- wsrep::ws_meta());
- Wsrep_server_state::instance().server_service().
- release_storage_service(storage_service);
- wsrep_store_threadvars(thd);
+ wsrep_remove_streaming_fragments(thd, "local");
}
/* Set thd->event_scheduler.data temporarily to NULL to avoid
callbacks to threadpool wait_begin() during rollback. */
@@ -247,9 +208,10 @@ static void wsrep_rollback_local(THD *thd)
/* Assign saved event_scheduler.data back before letting
client to continue. */
thd->event_scheduler.data= saved_esd;
+ thd->thread_stack= orig_thread_stack;
thd->wsrep_cs().sync_rollback_complete();
- WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
- thd->thread_id, (long long)thd->real_id);
+ WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)",
+ thd->thread_id, thd->real_id);
}
static void wsrep_rollback_process(THD *rollbacker,
@@ -286,18 +248,13 @@ static void wsrep_rollback_process(THD *rollbacker,
/* Rollback methods below may free thd pointer. Do not try
to access it after method returns. */
- if (thd->wsrep_trx().is_streaming() &&
- thd->wsrep_trx().bf_aborted_in_total_order())
- {
- wsrep_rollback_streaming_aborted_by_toi(thd);
- }
- else if (wsrep_thd_is_applying(thd))
+ if (wsrep_thd_is_applying(thd))
{
- wsrep_rollback_high_priority(thd);
+ wsrep_rollback_high_priority(thd, rollbacker);
}
else
{
- wsrep_rollback_local(thd);
+ wsrep_rollback_local(thd, rollbacker);
}
wsrep_store_threadvars(rollbacker);
thd_proc_info(rollbacker, "wsrep aborter idle");