summaryrefslogtreecommitdiff
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
authormkaruza <mario.karuza@galeracluster.com>2019-04-01 13:23:05 +0200
committerJan Lindström <jan.lindstrom@mariadb.com>2020-01-29 15:06:06 +0200
commit41bc736871078cf9a8f9888ed1a28249ee85549c (patch)
tree0fad0441d41364ad77ad10f7eeb68a2e32ea9bc7 /sql/wsrep_mysqld.cc
parent5defdc382bbf606b83e556c4f0d29dcd7954ebbc (diff)
downloadmariadb-git-41bc736871078cf9a8f9888ed1a28249ee85549c.tar.gz
Galera GTID support
Support for galera GTID consistency thru cluster. All nodes in cluster should have same GTID for replicated events which are originating from cluster. Cluster originating commands need to contain sequential WSREP GTID seqno Ignore manual setting of gtid_seq_no=X. In master-slave scenario where master is non galera node replicated GTID is replicated and is preserved in all nodes. To have this - domain_id, server_id and seqnos should be same on all nodes. Node which bootstraps the cluster, to achieve this, sends domain_id and server_id to other nodes and this combination is used to write GTID for events that are replicated inside cluster. Cluster nodes that are executing non replicated events are going to have different GTID than replicated ones, difference will be visible in domain part of gtid. With wsrep_gtid_domain_id you can set domain_id for WSREP cluster. Functions WSREP_LAST_WRITTEN_GTID, WSREP_LAST_SEEN_GTID and WSREP_SYNC_WAIT_UPTO_GTID now works with "native" GTID format. Fixed galera tests to reflect this chances. Add variable to manually update WSREP GTID seqno in cluster Add variable to manipulate and change WSREP GTID seqno. Next command originating from cluster and on same thread will have set seqno and cluster should change their internal counter to it's value. Behavior is same as using @@gtid_seq_no for non WSREP transaction.
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc174
1 files changed, 146 insertions, 28 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 444a187ea57..4c1d683b03e 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -52,11 +52,7 @@
/* wsrep-lib */
Wsrep_server_state* Wsrep_server_state::m_instance;
-my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
-#ifdef GTID_SUPPORT
-/* Sidno in global_sid_map corresponding to group uuid */
-rpl_sidno wsrep_sidno= -1;
-#endif /* GTID_SUPPORT */
+my_bool wsrep_emulate_bin_log= FALSE; // activating parts of binlog interface
my_bool wsrep_preordered_opt= FALSE;
/* Streaming Replication */
@@ -106,10 +102,9 @@ ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) s
ulong wsrep_max_ws_rows; // Max number of rows in ws
ulong wsrep_forced_binlog_format;
ulong wsrep_mysql_replication_bundle;
-bool wsrep_gtid_mode; // Use wsrep_gtid_domain_id
- // for galera transactions?
-uint32 wsrep_gtid_domain_id; // gtid_domain_id for galera
- // transactions
+
+bool wsrep_gtid_mode; // Enable WSREP native GTID support
+Wsrep_gtid_server wsrep_gtid_server;
/* Other configuration variables and their default values. */
my_bool wsrep_incremental_data_collection= 0; // Incremental data collection
@@ -144,6 +139,7 @@ mysql_mutex_t LOCK_wsrep_replaying;
mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_cond_t COND_wsrep_slave_threads;
+mysql_mutex_t LOCK_wsrep_gtid_wait_upto;
mysql_mutex_t LOCK_wsrep_cluster_config;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
@@ -167,7 +163,8 @@ ulong my_bind_addr;
PSI_mutex_key
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
- key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
+ key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_gtid_wait_upto,
+ key_LOCK_wsrep_desync,
key_LOCK_wsrep_config_state, key_LOCK_wsrep_cluster_config,
key_LOCK_wsrep_group_commit,
key_LOCK_wsrep_SR_pool,
@@ -179,7 +176,7 @@ PSI_mutex_key
PSI_cond_key key_COND_wsrep_thd,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
- key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads,
+ key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads, key_COND_wsrep_gtid_wait_upto,
key_COND_wsrep_joiner_monitor, key_COND_wsrep_donor_monitor;
PSI_file_key key_file_wsrep_gra_log;
@@ -193,6 +190,7 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_gtid_wait_upto, "LOCK_wsrep_gtid_wait_upto", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_cluster_config, "LOCK_wsrep_cluster_config", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
@@ -212,6 +210,7 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
{ &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_gtid_wait_upto, "COND_wsrep_gtid_wait_upto", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_joiner_monitor, "COND_wsrep_joiner_monitor", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_donor_monitor, "COND_wsrep_donor_monitor", PSI_FLAG_GLOBAL}
};
@@ -302,6 +301,58 @@ static void wsrep_log_cb(wsrep::log::level level, const char *msg)
}
}
+void wsrep_init_gtid()
+{
+ wsrep_server_gtid_t stored_gtid= wsrep_get_SE_checkpoint<wsrep_server_gtid_t>();
+ if (stored_gtid.server_id == 0)
+ {
+ rpl_gtid wsrep_last_gtid;
+ stored_gtid.domain_id= wsrep_gtid_server.domain_id;
+ if (mysql_bin_log.is_open() &&
+ mysql_bin_log.lookup_domain_in_binlog_state(stored_gtid.domain_id,
+ &wsrep_last_gtid))
+ {
+ stored_gtid.server_id= wsrep_last_gtid.server_id;
+ stored_gtid.seqno= wsrep_last_gtid.seq_no;
+ }
+ else
+ {
+ stored_gtid.server_id= global_system_variables.server_id;
+ stored_gtid.seqno= 0;
+ }
+ }
+ wsrep_gtid_server.gtid(stored_gtid);
+}
+
+bool wsrep_get_binlog_gtid_seqno(wsrep_server_gtid_t& gtid)
+{
+ rpl_gtid binlog_gtid;
+ int ret= 0;
+ if (mysql_bin_log.is_open() &&
+ mysql_bin_log.find_in_binlog_state(gtid.domain_id,
+ gtid.server_id,
+ &binlog_gtid))
+ {
+ gtid.domain_id= binlog_gtid.domain_id;
+ gtid.server_id= binlog_gtid.server_id;
+ gtid.seqno= binlog_gtid.seq_no;
+ ret= 1;
+ }
+ return ret;
+}
+
+bool wsrep_check_gtid_seqno(const uint32& domain, const uint32& server,
+ uint64& seqno)
+{
+ if (domain == wsrep_gtid_server.domain_id &&
+ server == wsrep_gtid_server.server_id)
+ {
+ if (wsrep_gtid_server.seqno_committed() < seqno) return 1;
+ return 0;
+ }
+ return 0;
+}
+
void wsrep_init_sidno(const wsrep::id& uuid)
{
/*
@@ -692,6 +743,16 @@ int wsrep_init_server()
void wsrep_init_globals()
{
wsrep_init_sidno(Wsrep_server_state::instance().connected_gtid().id());
+ wsrep_init_gtid();
+ /* Recover last written wsrep gtid */
+ if (wsrep_new_cluster)
+ {
+ wsrep_server_gtid_t gtid= {wsrep_gtid_server.domain_id,
+ wsrep_gtid_server.server_id, 0};
+ wsrep_get_binlog_gtid_seqno(gtid);
+ wsrep_gtid_server.seqno(gtid.seqno);
+ }
+ wsrep_new_cluster= 0;
wsrep_init_schema();
if (WSREP_ON)
{
@@ -793,6 +854,7 @@ void wsrep_thr_init()
mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_slave_threads, &COND_wsrep_slave_threads, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_gtid_wait_upto, &LOCK_wsrep_gtid_wait_upto, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_cluster_config, &LOCK_wsrep_cluster_config, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
@@ -903,6 +965,7 @@ void wsrep_thr_deinit()
mysql_cond_destroy(&COND_wsrep_sst_init);
mysql_mutex_destroy(&LOCK_wsrep_replaying);
mysql_cond_destroy(&COND_wsrep_replaying);
+ mysql_mutex_destroy(&LOCK_wsrep_gtid_wait_upto);
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_cond_destroy(&COND_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_cluster_config);
@@ -939,10 +1002,20 @@ void wsrep_recover()
uuid_str, (long long)local_seqno);
return;
}
- wsrep::gtid gtid= wsrep_get_SE_checkpoint();
+ wsrep::gtid gtid= wsrep_get_SE_checkpoint<wsrep::gtid>();
std::ostringstream oss;
oss << gtid;
- WSREP_INFO("Recovered position: %s", oss.str().c_str());
+ if (wsrep_gtid_mode)
+ {
+ wsrep_server_gtid_t server_gtid= wsrep_get_SE_checkpoint<wsrep_server_gtid_t>();
+ WSREP_INFO("Recovered position: %s,%d-%d-%llu", oss.str().c_str(), server_gtid.domain_id,
+ server_gtid.server_id, server_gtid.seqno);
+ }
+ else
+ {
+ WSREP_INFO("Recovered position: %s", oss.str().c_str());
+ }
+
}
@@ -1012,7 +1085,6 @@ bool wsrep_start_replication()
}
bool const bootstrap(TRUE == wsrep_new_cluster);
- wsrep_new_cluster= FALSE;
WSREP_INFO("Start replication");
@@ -1445,16 +1517,36 @@ int wsrep_to_buf_helper(
if (!ret && writer.write(&gtid_ev)) ret= 1;
}
#endif /* GTID_SUPPORT */
- if (wsrep_gtid_mode && thd->variables.gtid_seq_no)
+ /*
+ * Check if this is applier thread, slave_thread or
+ * we have set manually WSREP GTID seqno. Add GTID event.
+ */
+ if (thd->slave_thread || wsrep_thd_is_applying(thd) ||
+ thd->variables.wsrep_gtid_seq_no)
{
- Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no,
- thd->variables.gtid_domain_id,
- true, LOG_EVENT_SUPPRESS_USE_F,
- true, 0);
- gtid_event.server_id= thd->variables.server_id;
+ uint64 seqno= thd->variables.gtid_seq_no;
+ uint32 domain_id= thd->variables.gtid_domain_id;
+ uint32 server_id= thd->variables.server_id;
+ if (!thd->variables.gtid_seq_no && thd->variables.wsrep_gtid_seq_no)
+ {
+ seqno= thd->variables.wsrep_gtid_seq_no;
+ domain_id= wsrep_gtid_server.domain_id;
+ server_id= wsrep_gtid_server.server_id;
+ }
+ Gtid_log_event gtid_event(thd, seqno, domain_id, true,
+ LOG_EVENT_SUPPRESS_USE_F, true, 0);
+ gtid_event.server_id= server_id;
if (!gtid_event.is_valid()) ret= 0;
ret= writer.write(&gtid_event);
}
+ /*
+ It's local DDL so in case of possible gtid seqno (SET gtid_seq_no=X)
+ manipulation, seqno value will be ignored.
+ */
+ else
+ {
+ thd->variables.gtid_seq_no= 0;
+ }
/* if there is prepare query, add event for it */
if (!ret && thd->wsrep_TOI_pre_query)
@@ -1468,6 +1560,9 @@ int wsrep_to_buf_helper(
/* continue to append the actual query */
Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
+ /* WSREP GTID mode, we need to change server_id */
+ if (wsrep_gtid_mode && !thd->variables.gtid_seq_no)
+ ev.server_id= wsrep_gtid_server.server_id;
ev.checksum_alg= current_binlog_check_alg;
if (!ret && writer.write(&ev)) ret= 1;
if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
@@ -1947,6 +2042,28 @@ static int wsrep_TOI_begin(THD *thd, const char *db, const char *table,
rc= -1;
}
else {
+ if (!thd->variables.gtid_seq_no)
+ {
+ uint64 seqno= 0;
+ if (thd->variables.wsrep_gtid_seq_no &&
+ thd->variables.wsrep_gtid_seq_no > wsrep_gtid_server.seqno())
+ {
+ seqno= thd->variables.wsrep_gtid_seq_no;
+ wsrep_gtid_server.seqno(thd->variables.wsrep_gtid_seq_no);
+ }
+ else
+ {
+ seqno= wsrep_gtid_server.seqno_inc();
+ }
+ thd->variables.wsrep_gtid_seq_no= 0;
+ thd->wsrep_current_gtid_seqno= seqno;
+ if (mysql_bin_log.is_open() && wsrep_gtid_mode)
+ {
+ thd->variables.gtid_seq_no= seqno;
+ thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id;
+ thd->variables.server_id= wsrep_gtid_server.server_id;
+ }
+ }
++wsrep_to_isolation;
rc= 0;
}
@@ -1965,15 +2082,22 @@ static void wsrep_TOI_end(THD *thd) {
WSREP_DEBUG("TO END: %lld: %s", client_state.toi_meta().seqno().get(),
WSREP_QUERY(thd));
+ wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false);
+
if (wsrep_thd_is_local_toi(thd))
{
- wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());
wsrep::mutable_buffer err;
+
+ thd->wsrep_last_written_gtid_seqno= thd->wsrep_current_gtid_seqno;
+ wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid());
+
if (thd->is_error() && !wsrep_must_ignore_error(thd))
{
- wsrep_store_error(thd, err);
+ wsrep_store_error(thd, err);
}
+
int const ret= client_state.leave_toi_local(err);
+
if (!ret)
{
WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get());
@@ -2666,12 +2790,6 @@ void* start_wsrep_THD(void *arg)
statistic_increment(thread_created, &LOCK_status);
- if (wsrep_gtid_mode)
- {
- /* Adjust domain_id. */
- thd->variables.gtid_domain_id= wsrep_gtid_domain_id;
- }
-
thd->real_id=pthread_self(); // Keep purify happy
my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));