diff options
author | mkaruza <mario.karuza@galeracluster.com> | 2019-04-01 13:23:05 +0200 |
---|---|---|
committer | Jan Lindström <jan.lindstrom@mariadb.com> | 2020-01-29 15:06:06 +0200 |
commit | 41bc736871078cf9a8f9888ed1a28249ee85549c (patch) | |
tree | 0fad0441d41364ad77ad10f7eeb68a2e32ea9bc7 /sql | |
parent | 5defdc382bbf606b83e556c4f0d29dcd7954ebbc (diff) | |
download | mariadb-git-41bc736871078cf9a8f9888ed1a28249ee85549c.tar.gz |
Galera GTID support
Support for galera GTID consistency thru cluster. All nodes in cluster
should have same GTID for replicated events which are originating from cluster.
Cluster originating commands need to contain sequential WSREP GTID seqno
Ignore manual setting of gtid_seq_no=X.
In master-slave scenario where master is non galera node replicated GTID is
replicated and is preserved in all nodes.
To have this - domain_id, server_id and seqnos should be same on all nodes.
Node which bootstraps the cluster, to achieve this, sends domain_id and
server_id to other nodes and this combination is used to write GTID for events
that are replicated inside cluster.
Cluster nodes that are executing non replicated events are going to have different
GTID than replicated ones, difference will be visible in domain part of gtid.
With wsrep_gtid_domain_id you can set domain_id for WSREP cluster.
Functions WSREP_LAST_WRITTEN_GTID, WSREP_LAST_SEEN_GTID and
WSREP_SYNC_WAIT_UPTO_GTID now works with "native" GTID format.
Fixed galera tests to reflect this chances.
Add variable to manually update WSREP GTID seqno in cluster
Add variable to manipulate and change WSREP GTID seqno. Next command
originating from cluster and on same thread will have set seqno and
cluster should change their internal counter to it's value.
Behavior is same as using @@gtid_seq_no for non WSREP transaction.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 2 | ||||
-rw-r--r-- | sql/item_strfunc.cc | 118 | ||||
-rw-r--r-- | sql/log.cc | 127 | ||||
-rw-r--r-- | sql/log.h | 7 | ||||
-rw-r--r-- | sql/mysqld.cc | 2 | ||||
-rw-r--r-- | sql/service_wsrep.cc | 24 | ||||
-rw-r--r-- | sql/sql_class.cc | 4 | ||||
-rw-r--r-- | sql/sql_class.h | 9 | ||||
-rw-r--r-- | sql/sys_vars.cc | 10 | ||||
-rw-r--r-- | sql/wsrep_applier.cc | 40 | ||||
-rw-r--r-- | sql/wsrep_high_priority_service.cc | 5 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 174 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 125 | ||||
-rw-r--r-- | sql/wsrep_server_service.cc | 12 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 18 | ||||
-rw-r--r-- | sql/wsrep_trans_observer.h | 30 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 55 | ||||
-rw-r--r-- | sql/wsrep_var.h | 9 | ||||
-rw-r--r-- | sql/wsrep_xid.cc | 66 | ||||
-rw-r--r-- | sql/wsrep_xid.h | 7 |
20 files changed, 602 insertions, 242 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index ac6b4f3f453..7d61252eea6 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2057,7 +2057,7 @@ static my_xid wsrep_order_and_check_continuity(XID *list, int len) { #ifdef WITH_WSREP wsrep_sort_xid_array(list, len); - wsrep::gtid cur_position= wsrep_get_SE_checkpoint(); + wsrep::gtid cur_position= wsrep_get_SE_checkpoint<wsrep::gtid>(); long long cur_seqno= cur_position.seqno().get(); for (int i= 0; i < len; ++i) { diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc index e689a71e431..82259cd6924 100644 --- a/sql/item_strfunc.cc +++ b/sql/item_strfunc.cc @@ -5234,28 +5234,33 @@ String *Item_temptable_rowid::val_str(String *str) str_value.set((char*)(table->file->ref), max_length, &my_charset_bin); return &str_value; } + #ifdef WITH_WSREP #include "wsrep_mysqld.h" +/* Format is %d-%d-%llu */ +#define WSREP_MAX_WSREP_SERVER_GTID_STR_LEN 10+1+10+1+20 String *Item_func_wsrep_last_written_gtid::val_str_ascii(String *str) { - wsrep::gtid gtid= current_thd->wsrep_cs().last_written_gtid(); - if (gtid_str.alloc(wsrep::gtid_c_str_len())) + if (gtid_str.alloc(WSREP_MAX_WSREP_SERVER_GTID_STR_LEN+1)) { - my_error(ER_OUTOFMEMORY, wsrep::gtid_c_str_len()); - null_value= true; - return NULL; + my_error(ER_OUTOFMEMORY, WSREP_MAX_WSREP_SERVER_GTID_STR_LEN); + null_value= TRUE; + return 0; } - ssize_t gtid_len= gtid_print_to_c_str(gtid, (char*) gtid_str.ptr(), - wsrep::gtid_c_str_len()); + ssize_t gtid_len= my_snprintf((char*)gtid_str.ptr(), + WSREP_MAX_WSREP_SERVER_GTID_STR_LEN+1, + "%u-%u-%llu", wsrep_gtid_server.domain_id, + wsrep_gtid_server.server_id, + current_thd->wsrep_last_written_gtid_seqno); if (gtid_len < 0) { my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0), func_name(), - "wsrep_gtid_print failed"); - null_value= true; - return NULL; + "wsrep_gtid_print failed"); + null_value= TRUE; + return 0; } gtid_str.length(gtid_len); return >id_str; @@ -5263,23 +5268,23 @@ String *Item_func_wsrep_last_written_gtid::val_str_ascii(String *str) String *Item_func_wsrep_last_seen_gtid::val_str_ascii(String *str) { - /* TODO: Should call Wsrep_server_state.instance().last_committed_gtid() - instead. */ - wsrep::gtid gtid= Wsrep_server_state::instance().provider().last_committed_gtid(); - if (gtid_str.alloc(wsrep::gtid_c_str_len())) + if (gtid_str.alloc(WSREP_MAX_WSREP_SERVER_GTID_STR_LEN+1)) { - my_error(ER_OUTOFMEMORY, wsrep::gtid_c_str_len()); - null_value= true; - return NULL; + my_error(ER_OUTOFMEMORY, WSREP_MAX_WSREP_SERVER_GTID_STR_LEN); + null_value= TRUE; + return 0; } - ssize_t gtid_len= wsrep::gtid_print_to_c_str(gtid, (char*) gtid_str.ptr(), - wsrep::gtid_c_str_len()); + ssize_t gtid_len= my_snprintf((char*)gtid_str.ptr(), + WSREP_MAX_WSREP_SERVER_GTID_STR_LEN+1, + "%u-%u-%llu", wsrep_gtid_server.domain_id, + wsrep_gtid_server.server_id, + wsrep_gtid_server.seqno()); if (gtid_len < 0) { my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0), func_name(), "wsrep_gtid_print failed"); - null_value= true; - return NULL; + null_value= TRUE; + return 0; } gtid_str.length(gtid_len); return >id_str; @@ -5287,49 +5292,52 @@ String *Item_func_wsrep_last_seen_gtid::val_str_ascii(String *str) longlong Item_func_wsrep_sync_wait_upto::val_int() { - int timeout= -1; - String* gtid_str= args[0]->val_str(&value); - if (gtid_str == NULL) - { - my_error(ER_WRONG_ARGUMENTS, MYF(0), func_name()); - return 0LL; - } - - if (arg_count == 2) - { - timeout= args[1]->val_int(); - } + String *gtid_str __attribute__((unused)) = args[0]->val_str(&value); + null_value=0; + uint timeout; + rpl_gtid *gtid_list; + uint32 count; + int ret= 1; - wsrep_gtid_t gtid; - int gtid_len= wsrep_gtid_scan(gtid_str->ptr(), gtid_str->length(), >id); - if (gtid_len < 0) + if (args[0]->null_value) { my_error(ER_WRONG_ARGUMENTS, MYF(0), func_name()); - return 0LL; + null_value= TRUE; + return 0; } - if (gtid.seqno == WSREP_SEQNO_UNDEFINED && - wsrep_uuid_compare(>id.uuid, &WSREP_UUID_UNDEFINED) == 0) + if (arg_count==2 && !args[1]->null_value) + timeout= (uint)(args[1]->val_real()); + else + timeout= (uint)-1; + + if (!(gtid_list= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(), + &count))) { - return 1LL; + my_error(ER_INCORRECT_GTID_STATE, MYF(0), func_name()); + null_value= TRUE; + return 0; } - - enum wsrep::provider::status status= - wsrep_sync_wait_upto(current_thd, >id, timeout); - - if (status) + if (count == 1) { - int err; - switch (status) { - case wsrep::provider::error_transaction_missing: - err= ER_WRONG_ARGUMENTS; - break; - default: - err= ER_LOCK_WAIT_TIMEOUT; + if (wsrep_check_gtid_seqno(gtid_list[0].domain_id, gtid_list[0].server_id, + gtid_list[0].seq_no)) + { + if (wsrep_gtid_server.wait_gtid_upto(gtid_list[0].seq_no, timeout)) + { + my_error(ER_LOCK_WAIT_TIMEOUT, MYF(0), func_name()); + ret= 0; + } } - my_error(err, MYF(0), func_name()); - return 0LL; } - return 1LL; + else + { + my_error(ER_WRONG_ARGUMENTS, MYF(0), func_name()); + null_value= TRUE; + ret= 0; + } + my_free(gtid_list); + return ret; } + #endif /* WITH_WSREP */ diff --git a/sql/log.cc b/sql/log.cc index bf19946210a..93b5d697d14 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -5048,55 +5048,6 @@ MYSQL_BIN_LOG::is_xidlist_idle_nolock() return true; } -#ifdef WITH_WSREP -inline bool -is_gtid_cached_internal(IO_CACHE *file) -{ - uchar data[EVENT_TYPE_OFFSET+1]; - bool result= false; - my_off_t write_pos= my_b_tell(file); - if (reinit_io_cache(file, READ_CACHE, 0, 0, 0)) - return false; - /* - In the cache we have gtid event if , below condition is true, - */ - my_b_read(file, data, sizeof(data)); - uint event_type= (uchar)data[EVENT_TYPE_OFFSET]; - if (event_type == GTID_LOG_EVENT) - result= true; - /* - Cleanup , Why because we have not read the full buffer - and this will cause next to next reinit_io_cache(called in write_cache) - to make cache empty. - */ - file->read_pos= file->read_end; - if (reinit_io_cache(file, WRITE_CACHE, write_pos, 0, 0)) - return false; - return result; -} -#endif - -#ifdef WITH_WSREP -inline bool -MYSQL_BIN_LOG::is_gtid_cached(THD *thd) -{ - binlog_cache_mngr *mngr= (binlog_cache_mngr *) thd_get_ha_data( - thd, binlog_hton); - if (!mngr) - return false; - binlog_cache_data *cache_trans= mngr->get_binlog_cache_data( - use_trans_cache(thd, true)); - binlog_cache_data *cache_stmt= mngr->get_binlog_cache_data( - use_trans_cache(thd, false)); - if (cache_trans && !cache_trans->empty() && - is_gtid_cached_internal(&cache_trans->cache_log)) - return true; - if (cache_stmt && !cache_stmt->empty() && - is_gtid_cached_internal(&cache_stmt->cache_log)) - return true; - return false; -} -#endif /** Create a new log file name. @@ -5719,31 +5670,47 @@ THD::binlog_start_trans_and_stmt() { DBUG_VOID_RETURN; } - /* Write Gtid - Get domain id only when gtid mode is set - If this event is replicate through a master then , - we will forward the same gtid another nodes - We have to do this only one time in mysql transaction. - Since this function is called multiple times , We will check for - ha_info->is_started() - */ + /* If this event replicates through a master-slave then we need to + inject manually GTID so it is preserved in the cluster. We are writing + directly to WSREP buffer and not in IO cache because in case of IO cache + GTID event will be duplicated in binlog. + We have to do this only one time in mysql transaction. + Since this function is called multiple times , We will check for + ha_info->is_started(). + */ Ha_trx_info *ha_info; ha_info= this->ha_data[binlog_hton->slot].ha_info + (mstmt_mode ? 1 : 0); - if (!ha_info->is_started() && wsrep_gtid_mode - && this->variables.gtid_seq_no) + if (!ha_info->is_started() && + (this->variables.gtid_seq_no || this->variables.wsrep_gtid_seq_no) && + wsrep_on(this) && + (this->wsrep_cs().mode() == wsrep::client_state::m_local)) { - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(1); - IO_CACHE *file= &cache_data->cache_log; - Log_event_writer writer(file, cache_data); - Gtid_log_event gtid_event(this, this->variables.gtid_seq_no, - this->variables.gtid_domain_id, - true, LOG_EVENT_SUPPRESS_USE_F, - true, 0); - gtid_event.server_id= this->variables.server_id; - writer.write(>id_event); + uchar *buf= 0; + size_t len= 0; + IO_CACHE tmp_io_cache; + Log_event_writer writer(&tmp_io_cache, 0); + if(!open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, + 128, MYF(MY_WME))) + { + uint64 seqno= this->variables.gtid_seq_no; + uint32 domain_id= this->variables.gtid_domain_id; + uint32 server_id= this->variables.server_id; + if (!this->variables.gtid_seq_no && this->variables.wsrep_gtid_seq_no) + { + seqno= this->variables.wsrep_gtid_seq_no; + domain_id= wsrep_gtid_server.domain_id; + server_id= wsrep_gtid_server.server_id; + } + Gtid_log_event gtid_event(this, seqno, domain_id, true, + LOG_EVENT_SUPPRESS_USE_F, true, 0); + gtid_event.server_id= server_id; + writer.write(>id_event); + wsrep_write_cache_buf(&tmp_io_cache, &buf, &len); + if (len > 0) this->wsrep_cs().append_data(wsrep::const_buffer(buf, len)); + if (buf) my_free(buf); + close_cached_file(&tmp_io_cache); + } } #endif if (mstmt_mode) @@ -6024,20 +5991,9 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_ENTER("write_gtid_event"); DBUG_PRINT("enter", ("standalone: %d", standalone)); -#ifdef WITH_WSREP - if (WSREP(thd) && - (wsrep_thd_trx_seqno(thd) > 0) && - wsrep_gtid_mode && !thd->variables.gtid_seq_no) - { - domain_id= wsrep_gtid_domain_id; - } else { -#endif /* WITH_WSREP */ + seq_no= thd->variables.gtid_seq_no; domain_id= thd->variables.gtid_domain_id; -#ifdef WITH_WSREP - } -#endif /* WITH_WSREP */ local_server_id= thd->variables.server_id; - seq_no= thd->variables.gtid_seq_no; DBUG_ASSERT(local_server_id != 0); @@ -6084,8 +6040,11 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_ASSERT(this == &mysql_bin_log); #ifdef WITH_WSREP - if (wsrep_gtid_mode && is_gtid_cached(thd)) - DBUG_RETURN(false); + if (wsrep_gtid_mode) + { + thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + thd->variables.server_id= global_system_variables.server_id; + } #endif if (write_event(>id_event)) diff --git a/sql/log.h b/sql/log.h index bf1dbd30c6c..7ccfd606f21 100644 --- a/sql/log.h +++ b/sql/log.h @@ -561,13 +561,6 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG bool write_transaction_to_binlog_events(group_commit_entry *entry); void trx_group_commit_leader(group_commit_entry *leader); bool is_xidlist_idle_nolock(); -#ifdef WITH_WSREP - /* - When this mariadb node is slave and galera enabled. So in this case - we write the gtid in wsrep_run_commit itself. - */ - inline bool is_gtid_cached(THD *thd); -#endif public: /* A list of struct xid_count_per_binlog is used to keep track of how many diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 7fe98c756cd..e2489f7706f 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -5724,6 +5724,8 @@ int mysqld_main(int argc, char **argv) { wsrep_shutdown_replication(); } + /* Release threads if they are waiting in WSREP_SYNC_WAIT_UPTO_GTID */ + wsrep_gtid_server.signal_waiters(0, true); #endif close_connections(); diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc index 36ea311f59f..68de55b5778 100644 --- a/sql/service_wsrep.cc +++ b/sql/service_wsrep.cc @@ -281,16 +281,34 @@ extern "C" int wsrep_thd_append_key(THD *thd, } ret= client_state.append_key(wsrep_key); } + /* + In case of `wsrep_gtid_mode` when WS will be replicated, we need to set + `server_id` for events that are going to be written in IO, and in case of + manual SET gtid_seq_no=X we are ignoring value. + */ + if (!ret && wsrep_gtid_mode && !thd->slave_thread && !wsrep_thd_is_applying(thd)) + { + thd->variables.server_id= wsrep_gtid_server.server_id; + thd->variables.gtid_seq_no= 0; + } return ret; } extern "C" void wsrep_commit_ordered(THD *thd) { if (wsrep_is_active(thd) && - thd->wsrep_trx().state() == wsrep::transaction::s_committing && - !wsrep_commit_will_write_binlog(thd)) + (thd->wsrep_trx().state() == wsrep::transaction::s_committing || + thd->wsrep_trx().state() == wsrep::transaction::s_ordered_commit)) { - thd->wsrep_cs().ordered_commit(); + wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false); + if (wsrep_thd_is_local(thd)) + { + thd->wsrep_last_written_gtid_seqno= thd->wsrep_current_gtid_seqno; + } + if (!wsrep_commit_will_write_binlog(thd)) + { + thd->wsrep_cs().ordered_commit(); + } } } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 1fdf4f17447..b5054557c46 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -697,9 +697,10 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) wsrep_apply_format(0), wsrep_rbr_buf(NULL), wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED), + wsrep_last_written_gtid_seqno(0), + wsrep_current_gtid_seqno(0), wsrep_affected_rows(0), wsrep_has_ignored_error(false), - wsrep_replicate_GTID(false), wsrep_ignore_table(false), /* wsrep-lib */ @@ -1313,7 +1314,6 @@ void THD::init() wsrep_rbr_buf = NULL; wsrep_affected_rows = 0; m_wsrep_next_trx_id = WSREP_UNDEFINED_TRX_ID; - wsrep_replicate_GTID = false; #endif /* WITH_WSREP */ if (variables.sql_log_bin) diff --git a/sql/sql_class.h b/sql/sql_class.h index 5bbe2e386d0..0891cd214f8 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -629,6 +629,7 @@ typedef struct system_variables are based on the cluster size): */ ulong saved_auto_increment_increment, saved_auto_increment_offset; + ulonglong wsrep_gtid_seq_no; #endif /* WITH_WSREP */ uint eq_range_index_dive_limit; ulong column_compression_zlib_strategy; @@ -4875,17 +4876,13 @@ public: size_t wsrep_TOI_pre_query_len; wsrep_po_handle_t wsrep_po_handle; size_t wsrep_po_cnt; -#ifdef GTID_SUPPORT - my_bool wsrep_po_in_trans; - rpl_sid wsrep_po_sid; -#endif /* GTID_SUPPORT */ void *wsrep_apply_format; uchar* wsrep_rbr_buf; wsrep_gtid_t wsrep_sync_wait_gtid; - // wsrep_gtid_t wsrep_last_written_gtid; + uint64 wsrep_last_written_gtid_seqno; + uint64 wsrep_current_gtid_seqno; ulong wsrep_affected_rows; bool wsrep_has_ignored_error; - bool wsrep_replicate_GTID; /* When enabled, do not replicate/binlog updates from the current table that's diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index de476042a80..ab382304973 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -5792,9 +5792,17 @@ static Sys_var_uint Sys_wsrep_gtid_domain_id( "wsrep_gtid_domain_id", "When wsrep_gtid_mode is set, this value is " "used as gtid_domain_id for galera transactions and also copied to the " "joiner nodes during state transfer. It is ignored, otherwise.", - GLOBAL_VAR(wsrep_gtid_domain_id), CMD_LINE(REQUIRED_ARG), + GLOBAL_VAR(wsrep_gtid_server.domain_id), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, UINT_MAX32), DEFAULT(0), BLOCK_SIZE(1)); +static Sys_var_ulonglong Sys_wsrep_gtid_seq_no( + "wsrep_gtid_seq_no", + "Internal server usage, manually set WSREP GTID seqno.", + SESSION_ONLY(wsrep_gtid_seq_no), + NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0), + BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_gtid_seq_no_check)); + static Sys_var_mybool Sys_wsrep_gtid_mode( "wsrep_gtid_mode", "Automatically update the (joiner) node's " "wsrep_gtid_domain_id value with that of donor's (received during " diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index 1ab65df1ca3..25a4e22aeb4 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -131,6 +131,12 @@ int wsrep_apply_events(THD* thd, if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", (long long) wsrep_thd_trx_seqno(thd)); + thd->variables.gtid_seq_no= 0; + if (wsrep_gtid_mode) + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + else + thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + while (buf_len) { int exec_res; @@ -150,22 +156,38 @@ int wsrep_apply_events(THD* thd, case FORMAT_DESCRIPTION_EVENT: wsrep_set_apply_format(thd, (Format_description_log_event*)ev); continue; -#ifdef GTID_SUPPORT - case GTID_LOG_EVENT: - { - Gtid_log_event* gev= (Gtid_log_event*)ev; - if (gev->get_gno() == 0) + case GTID_EVENT: { - /* Skip GTID log event to make binlog to generate LTID on commit */ + Gtid_log_event *gtid_ev= (Gtid_log_event*)ev; + thd->variables.server_id= gtid_ev->server_id; + thd->variables.gtid_domain_id= gtid_ev->domain_id; + if ((gtid_ev->server_id == wsrep_gtid_server.server_id) && + (gtid_ev->domain_id == wsrep_gtid_server.domain_id)) + { + thd->variables.wsrep_gtid_seq_no= gtid_ev->seq_no; + } + else + { + thd->variables.gtid_seq_no= gtid_ev->seq_no; + } delete ev; - continue; } - } -#endif /* GTID_SUPPORT */ + continue; default: break; } + + if (!thd->variables.gtid_seq_no && wsrep_thd_is_toi(thd) && + (ev->get_type_code() == QUERY_EVENT)) + { + uint64 seqno= wsrep_gtid_server.seqno_inc(); + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + } + } /* Use the original server id for logging. */ thd->set_server_id(ev->server_id); thd->set_time(); // time the query diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index 06d398baf5f..d73b9cb09ce 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -396,7 +396,8 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, thd->close_temporary_tables(); thd->lex->sql_command= SQLCOM_END; - wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); + wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false); + wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid()); must_exit_= check_exit_status(); @@ -448,7 +449,7 @@ int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_ cs.before_rollback(); cs.after_rollback(); } - wsrep_set_SE_checkpoint(ws_meta.gtid()); + wsrep_set_SE_checkpoint(ws_meta.gtid(), wsrep_gtid_server.gtid()); ret= ret || cs.provider().commit_order_leave(ws_handle, ws_meta, err); cs.after_applying(); } diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 444a187ea57..4c1d683b03e 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -52,11 +52,7 @@ /* wsrep-lib */ Wsrep_server_state* Wsrep_server_state::m_instance; -my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface -#ifdef GTID_SUPPORT -/* Sidno in global_sid_map corresponding to group uuid */ -rpl_sidno wsrep_sidno= -1; -#endif /* GTID_SUPPORT */ +my_bool wsrep_emulate_bin_log= FALSE; // activating parts of binlog interface my_bool wsrep_preordered_opt= FALSE; /* Streaming Replication */ @@ -106,10 +102,9 @@ ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) s ulong wsrep_max_ws_rows; // Max number of rows in ws ulong wsrep_forced_binlog_format; ulong wsrep_mysql_replication_bundle; -bool wsrep_gtid_mode; // Use wsrep_gtid_domain_id - // for galera transactions? -uint32 wsrep_gtid_domain_id; // gtid_domain_id for galera - // transactions + +bool wsrep_gtid_mode; // Enable WSREP native GTID support +Wsrep_gtid_server wsrep_gtid_server; /* Other configuration variables and their default values. */ my_bool wsrep_incremental_data_collection= 0; // Incremental data collection @@ -144,6 +139,7 @@ mysql_mutex_t LOCK_wsrep_replaying; mysql_cond_t COND_wsrep_replaying; mysql_mutex_t LOCK_wsrep_slave_threads; mysql_cond_t COND_wsrep_slave_threads; +mysql_mutex_t LOCK_wsrep_gtid_wait_upto; mysql_mutex_t LOCK_wsrep_cluster_config; mysql_mutex_t LOCK_wsrep_desync; mysql_mutex_t LOCK_wsrep_config_state; @@ -167,7 +163,8 @@ ulong my_bind_addr; PSI_mutex_key key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init, - key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync, + key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_gtid_wait_upto, + key_LOCK_wsrep_desync, key_LOCK_wsrep_config_state, key_LOCK_wsrep_cluster_config, key_LOCK_wsrep_group_commit, key_LOCK_wsrep_SR_pool, @@ -179,7 +176,7 @@ PSI_mutex_key PSI_cond_key key_COND_wsrep_thd, key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread, - key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads, + key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads, key_COND_wsrep_gtid_wait_upto, key_COND_wsrep_joiner_monitor, key_COND_wsrep_donor_monitor; PSI_file_key key_file_wsrep_gra_log; @@ -193,6 +190,7 @@ static PSI_mutex_info wsrep_mutexes[]= { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_gtid_wait_upto, "LOCK_wsrep_gtid_wait_upto", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_cluster_config, "LOCK_wsrep_cluster_config", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}, @@ -212,6 +210,7 @@ static PSI_cond_info wsrep_conds[]= { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0}, { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_gtid_wait_upto, "COND_wsrep_gtid_wait_upto", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_joiner_monitor, "COND_wsrep_joiner_monitor", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_donor_monitor, "COND_wsrep_donor_monitor", PSI_FLAG_GLOBAL} }; @@ -302,6 +301,58 @@ static void wsrep_log_cb(wsrep::log::level level, const char *msg) } } +void wsrep_init_gtid() +{ + wsrep_server_gtid_t stored_gtid= wsrep_get_SE_checkpoint<wsrep_server_gtid_t>(); + if (stored_gtid.server_id == 0) + { + rpl_gtid wsrep_last_gtid; + stored_gtid.domain_id= wsrep_gtid_server.domain_id; + if (mysql_bin_log.is_open() && + mysql_bin_log.lookup_domain_in_binlog_state(stored_gtid.domain_id, + &wsrep_last_gtid)) + { + stored_gtid.server_id= wsrep_last_gtid.server_id; + stored_gtid.seqno= wsrep_last_gtid.seq_no; + } + else + { + stored_gtid.server_id= global_system_variables.server_id; + stored_gtid.seqno= 0; + } + } + wsrep_gtid_server.gtid(stored_gtid); +} + +bool wsrep_get_binlog_gtid_seqno(wsrep_server_gtid_t& gtid) +{ + rpl_gtid binlog_gtid; + int ret= 0; + if (mysql_bin_log.is_open() && + mysql_bin_log.find_in_binlog_state(gtid.domain_id, + gtid.server_id, + &binlog_gtid)) + { + gtid.domain_id= binlog_gtid.domain_id; + gtid.server_id= binlog_gtid.server_id; + gtid.seqno= binlog_gtid.seq_no; + ret= 1; + } + return ret; +} + +bool wsrep_check_gtid_seqno(const uint32& domain, const uint32& server, + uint64& seqno) +{ + if (domain == wsrep_gtid_server.domain_id && + server == wsrep_gtid_server.server_id) + { + if (wsrep_gtid_server.seqno_committed() < seqno) return 1; + return 0; + } + return 0; +} + void wsrep_init_sidno(const wsrep::id& uuid) { /* @@ -692,6 +743,16 @@ int wsrep_init_server() void wsrep_init_globals() { wsrep_init_sidno(Wsrep_server_state::instance().connected_gtid().id()); + wsrep_init_gtid(); + /* Recover last written wsrep gtid */ + if (wsrep_new_cluster) + { + wsrep_server_gtid_t gtid= {wsrep_gtid_server.domain_id, + wsrep_gtid_server.server_id, 0}; + wsrep_get_binlog_gtid_seqno(gtid); + wsrep_gtid_server.seqno(gtid.seqno); + } + wsrep_new_cluster= 0; wsrep_init_schema(); if (WSREP_ON) { @@ -793,6 +854,7 @@ void wsrep_thr_init() mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_slave_threads, &COND_wsrep_slave_threads, NULL); + mysql_mutex_init(key_LOCK_wsrep_gtid_wait_upto, &LOCK_wsrep_gtid_wait_upto, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_cluster_config, &LOCK_wsrep_cluster_config, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); @@ -903,6 +965,7 @@ void wsrep_thr_deinit() mysql_cond_destroy(&COND_wsrep_sst_init); mysql_mutex_destroy(&LOCK_wsrep_replaying); mysql_cond_destroy(&COND_wsrep_replaying); + mysql_mutex_destroy(&LOCK_wsrep_gtid_wait_upto); mysql_mutex_destroy(&LOCK_wsrep_slave_threads); mysql_cond_destroy(&COND_wsrep_slave_threads); mysql_mutex_destroy(&LOCK_wsrep_cluster_config); @@ -939,10 +1002,20 @@ void wsrep_recover() uuid_str, (long long)local_seqno); return; } - wsrep::gtid gtid= wsrep_get_SE_checkpoint(); + wsrep::gtid gtid= wsrep_get_SE_checkpoint<wsrep::gtid>(); std::ostringstream oss; oss << gtid; - WSREP_INFO("Recovered position: %s", oss.str().c_str()); + if (wsrep_gtid_mode) + { + wsrep_server_gtid_t server_gtid= wsrep_get_SE_checkpoint<wsrep_server_gtid_t>(); + WSREP_INFO("Recovered position: %s,%d-%d-%llu", oss.str().c_str(), server_gtid.domain_id, + server_gtid.server_id, server_gtid.seqno); + } + else + { + WSREP_INFO("Recovered position: %s", oss.str().c_str()); + } + } @@ -1012,7 +1085,6 @@ bool wsrep_start_replication() } bool const bootstrap(TRUE == wsrep_new_cluster); - wsrep_new_cluster= FALSE; WSREP_INFO("Start replication"); @@ -1445,16 +1517,36 @@ int wsrep_to_buf_helper( if (!ret && writer.write(>id_ev)) ret= 1; } #endif /* GTID_SUPPORT */ - if (wsrep_gtid_mode && thd->variables.gtid_seq_no) + /* + * Check if this is applier thread, slave_thread or + * we have set manually WSREP GTID seqno. Add GTID event. + */ + if (thd->slave_thread || wsrep_thd_is_applying(thd) || + thd->variables.wsrep_gtid_seq_no) { - Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no, - thd->variables.gtid_domain_id, - true, LOG_EVENT_SUPPRESS_USE_F, - true, 0); - gtid_event.server_id= thd->variables.server_id; + uint64 seqno= thd->variables.gtid_seq_no; + uint32 domain_id= thd->variables.gtid_domain_id; + uint32 server_id= thd->variables.server_id; + if (!thd->variables.gtid_seq_no && thd->variables.wsrep_gtid_seq_no) + { + seqno= thd->variables.wsrep_gtid_seq_no; + domain_id= wsrep_gtid_server.domain_id; + server_id= wsrep_gtid_server.server_id; + } + Gtid_log_event gtid_event(thd, seqno, domain_id, true, + LOG_EVENT_SUPPRESS_USE_F, true, 0); + gtid_event.server_id= server_id; if (!gtid_event.is_valid()) ret= 0; ret= writer.write(>id_event); } + /* + It's local DDL so in case of possible gtid seqno (SET gtid_seq_no=X) + manipulation, seqno value will be ignored. + */ + else + { + thd->variables.gtid_seq_no= 0; + } /* if there is prepare query, add event for it */ if (!ret && thd->wsrep_TOI_pre_query) @@ -1468,6 +1560,9 @@ int wsrep_to_buf_helper( /* continue to append the actual query */ Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); + /* WSREP GTID mode, we need to change server_id */ + if (wsrep_gtid_mode && !thd->variables.gtid_seq_no) + ev.server_id= wsrep_gtid_server.server_id; ev.checksum_alg= current_binlog_check_alg; if (!ret && writer.write(&ev)) ret= 1; if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; @@ -1947,6 +2042,28 @@ static int wsrep_TOI_begin(THD *thd, const char *db, const char *table, rc= -1; } else { + if (!thd->variables.gtid_seq_no) + { + uint64 seqno= 0; + if (thd->variables.wsrep_gtid_seq_no && + thd->variables.wsrep_gtid_seq_no > wsrep_gtid_server.seqno()) + { + seqno= thd->variables.wsrep_gtid_seq_no; + wsrep_gtid_server.seqno(thd->variables.wsrep_gtid_seq_no); + } + else + { + seqno= wsrep_gtid_server.seqno_inc(); + } + thd->variables.wsrep_gtid_seq_no= 0; + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + thd->variables.server_id= wsrep_gtid_server.server_id; + } + } ++wsrep_to_isolation; rc= 0; } @@ -1965,15 +2082,22 @@ static void wsrep_TOI_end(THD *thd) { WSREP_DEBUG("TO END: %lld: %s", client_state.toi_meta().seqno().get(), WSREP_QUERY(thd)); + wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false); + if (wsrep_thd_is_local_toi(thd)) { - wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); wsrep::mutable_buffer err; + + thd->wsrep_last_written_gtid_seqno= thd->wsrep_current_gtid_seqno; + wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid()); + if (thd->is_error() && !wsrep_must_ignore_error(thd)) { - wsrep_store_error(thd, err); + wsrep_store_error(thd, err); } + int const ret= client_state.leave_toi_local(err); + if (!ret) { WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get()); @@ -2666,12 +2790,6 @@ void* start_wsrep_THD(void *arg) statistic_increment(thread_created, &LOCK_status); - if (wsrep_gtid_mode) - { - /* Adjust domain_id. */ - thd->variables.gtid_domain_id= wsrep_gtid_domain_id; - } - thd->real_id=pthread_self(); // Keep purify happy my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 5ad68fbe423..222b265248f 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -38,6 +38,7 @@ typedef struct st_mysql_show_var SHOW_VAR; #include "wsrep/streaming_context.hpp" #include "wsrep_api.h" #include <vector> +#include <map> #include "wsrep_server_state.h" #define WSREP_UNDEFINED_TRX_ID ULONGLONG_MAX @@ -97,7 +98,6 @@ extern ulong wsrep_running_applier_threads; extern ulong wsrep_running_rollbacker_threads; extern bool wsrep_new_cluster; extern bool wsrep_gtid_mode; -extern uint32 wsrep_gtid_domain_id; enum enum_wsrep_reject_types { WSREP_REJECT_NONE, /* nothing rejected */ @@ -303,6 +303,7 @@ extern mysql_mutex_t LOCK_wsrep_replaying; extern mysql_cond_t COND_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_cond_t COND_wsrep_slave_threads; +extern mysql_mutex_t LOCK_wsrep_gtid_wait_upto; extern mysql_mutex_t LOCK_wsrep_cluster_config; extern mysql_mutex_t LOCK_wsrep_desync; extern mysql_mutex_t LOCK_wsrep_SR_pool; @@ -316,9 +317,6 @@ extern mysql_cond_t COND_wsrep_donor_monitor; extern my_bool wsrep_emulate_bin_log; extern int wsrep_to_isolation; -#ifdef GTID_SUPPORT -extern rpl_sidno wsrep_sidno; -#endif /* GTID_SUPPORT */ extern my_bool wsrep_preordered_opt; #ifdef HAVE_PSI_INTERFACE @@ -337,6 +335,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying; extern PSI_cond_key key_COND_wsrep_replaying; extern PSI_mutex_key key_LOCK_wsrep_slave_threads; extern PSI_cond_key key_COND_wsrep_slave_threads; +extern PSI_mutex_key key_LOCK_wsrep_gtid_wait_upto; +extern PSI_cond_key key_COND_wsrep_gtid_wait_upto; extern PSI_mutex_key key_LOCK_wsrep_cluster_config; extern PSI_mutex_key key_LOCK_wsrep_desync; extern PSI_mutex_key key_LOCK_wsrep_SR_pool; @@ -387,7 +387,122 @@ class Log_event; int wsrep_ignored_error_code(Log_event* ev, int error); int wsrep_must_ignore_error(THD* thd); -bool wsrep_replicate_GTID(THD* thd); +struct wsrep_server_gtid_t +{ + uint32 domain_id; + uint32 server_id; + uint64 seqno; +}; +class Wsrep_gtid_server +{ +public: + uint32 domain_id; + uint32 server_id; + Wsrep_gtid_server() + : m_force_signal(false) + , m_seqno(0) + , m_committed_seqno(0) + { } + void gtid(const wsrep_server_gtid_t& gtid) + { + domain_id= gtid.domain_id; + server_id= gtid.server_id; + m_seqno= gtid.seqno; + } + wsrep_server_gtid_t gtid() + { + wsrep_server_gtid_t gtid; + gtid.domain_id= domain_id; + gtid.server_id= server_id; + gtid.seqno= m_seqno; + return gtid; + } + void seqno(const uint64 seqno) { m_seqno= seqno; } + uint64 seqno() const { return m_seqno; } + uint64 seqno_committed() const { return m_committed_seqno; } + uint64 seqno_inc() + { + m_seqno++; + return m_seqno; + } + const wsrep_server_gtid_t& undefined() + { + return m_undefined; + } + int wait_gtid_upto(const uint64_t seqno, uint timeout) + { + int wait_result; + struct timespec wait_time; + int ret= 0; + mysql_cond_t wait_cond; + mysql_cond_init(key_COND_wsrep_gtid_wait_upto, &wait_cond, NULL); + set_timespec(wait_time, timeout); + mysql_mutex_lock(&LOCK_wsrep_gtid_wait_upto); + std::multimap<uint64, mysql_cond_t*>::iterator it; + try + { + it= m_wait_map.insert(std::make_pair(seqno, &wait_cond)); + } + catch (std::bad_alloc& e) + { + return 0; + } + while ((m_committed_seqno < seqno) && !m_force_signal) + { + wait_result= mysql_cond_timedwait(&wait_cond, + &LOCK_wsrep_gtid_wait_upto, + &wait_time); + if (wait_result == ETIMEDOUT || wait_result == ETIME) + { + ret= 1; + break; + } + } + m_wait_map.erase(it); + mysql_mutex_unlock(&LOCK_wsrep_gtid_wait_upto); + mysql_cond_destroy(&wait_cond); + return ret; + } + void signal_waiters(uint64 seqno, bool signal_all) + { + if (!signal_all && (m_committed_seqno >= seqno)) + { + return; + } + mysql_mutex_lock(&LOCK_wsrep_gtid_wait_upto); + m_force_signal= true; + std::multimap<uint64, mysql_cond_t*>::iterator it_end; + std::multimap<uint64, mysql_cond_t*>::iterator it_begin; + if (signal_all) + { + it_end= m_wait_map.end(); + } + else + { + it_end= m_wait_map.upper_bound(seqno); + } + for (it_begin = m_wait_map.begin(); it_begin != it_end; ++it_begin) + { + mysql_cond_signal(it_begin->second); + } + m_force_signal= false; + mysql_mutex_unlock(&LOCK_wsrep_gtid_wait_upto); + if (m_committed_seqno < seqno) + { + m_committed_seqno= seqno; + } + } +private: + const wsrep_server_gtid_t m_undefined= {0,0,0}; + std::multimap<uint64, mysql_cond_t*> m_wait_map; + bool m_force_signal; + Atomic_counter<uint64_t> m_seqno; + Atomic_counter<uint64_t> m_committed_seqno; +}; +extern Wsrep_gtid_server wsrep_gtid_server; +void wsrep_init_gtid(); +bool wsrep_check_gtid_seqno(const uint32&, const uint32&, uint64&); +bool wsrep_get_binlog_gtid_seqno(wsrep_server_gtid_t&); typedef struct wsrep_key_arr { diff --git a/sql/wsrep_server_service.cc b/sql/wsrep_server_service.cc index d0a9b54ac1b..6d737463f07 100644 --- a/sql/wsrep_server_service.cc +++ b/sql/wsrep_server_service.cc @@ -153,7 +153,7 @@ void Wsrep_server_service::bootstrap() wsrep::log_info() << "Bootstrapping a new cluster, setting initial position to " << wsrep::gtid::undefined(); - wsrep_set_SE_checkpoint(wsrep::gtid::undefined()); + wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined()); } void Wsrep_server_service::log_message(enum wsrep::log::level level, @@ -212,7 +212,7 @@ void Wsrep_server_service::log_view( if (prev_view.state_id().id() != view.state_id().id()) { WSREP_DEBUG("New cluster UUID was generated, resetting position info"); - wsrep_set_SE_checkpoint(wsrep::gtid::undefined()); + wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined()); checkpoint_was_reset= true; } @@ -263,9 +263,9 @@ void Wsrep_server_service::log_view( Wsrep_server_state::instance().provider().last_committed_gtid().seqno(); if (checkpoint_was_reset || last_committed != view.state_id().seqno()) { - wsrep_set_SE_checkpoint(view.state_id()); + wsrep_set_SE_checkpoint(view.state_id(), wsrep_gtid_server.gtid()); } - DBUG_ASSERT(wsrep_get_SE_checkpoint().id() == view.state_id().id()); + DBUG_ASSERT(wsrep_get_SE_checkpoint<wsrep::gtid>().id() == view.state_id().id()); } else { @@ -299,13 +299,13 @@ wsrep::view Wsrep_server_service::get_view(wsrep::client_service& c, wsrep::gtid Wsrep_server_service::get_position(wsrep::client_service&) { - return wsrep_get_SE_checkpoint(); + return wsrep_get_SE_checkpoint<wsrep::gtid>(); } void Wsrep_server_service::set_position(wsrep::client_service&, const wsrep::gtid& gtid) { - wsrep_set_SE_checkpoint(gtid); + wsrep_set_SE_checkpoint(gtid, wsrep_gtid_server.gtid()); } void Wsrep_server_service::log_state_change( diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index 02f7d4b6760..d478ea486cd 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -351,8 +351,8 @@ void wsrep_sst_received (THD* thd, wsrep::seqno(seqno)); if (!wsrep_before_SE()) { - wsrep_set_SE_checkpoint(wsrep::gtid::undefined()); - wsrep_set_SE_checkpoint(sst_gtid); + wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined()); + wsrep_set_SE_checkpoint(sst_gtid, wsrep_gtid_server.gtid()); } wsrep_verify_SE_checkpoint(uuid, seqno); @@ -594,7 +594,7 @@ static void* sst_joiner_thread (void* a) err= EINVAL; goto err; } else { - wsrep_gtid_domain_id= (uint32) domain_id; + wsrep_gtid_server.domain_id= (uint32) domain_id; } } } @@ -1365,13 +1365,15 @@ static int sst_donate_mysqldump (const char* addr, WSREP_SST_OPT_LPORT " '%u' " WSREP_SST_OPT_SOCKET " '%s' " "%s" - WSREP_SST_OPT_GTID " '%s:%lld' " + WSREP_SST_OPT_GTID " '%s:%lld,%d-%d-%llu' " WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'" "%s", addr, port, mysqld_port, mysqld_unix_port, wsrep_defaults_file, uuid_oss.str().c_str(), gtid.seqno().get(), - wsrep_gtid_domain_id, + wsrep_gtid_server.domain_id, wsrep_gtid_server.server_id, + wsrep_gtid_server.seqno(), + wsrep_gtid_server.domain_id, bypass ? " " WSREP_SST_OPT_BYPASS : ""); if (ret < 0 || size_t(ret) >= cmd_len) @@ -1538,7 +1540,7 @@ static int sst_flush_tables(THD* thd) */ char content[100]; snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid, - (long long)wsrep_locked_seqno, wsrep_gtid_domain_id); + (long long)wsrep_locked_seqno, wsrep_gtid_server.domain_id); err= sst_create_file(flush_success, content); const char base_name[]= "tables_flushed"; @@ -1563,7 +1565,7 @@ static int sst_flush_tables(THD* thd) fprintf(file, "%s:%lld %u\n", uuid_oss.str().c_str(), server_state.pause_seqno().get(), - wsrep_gtid_domain_id); + wsrep_gtid_server.domain_id); fsync(fileno(file)); fclose(file); if (rename(tmp_name, real_name) == -1) @@ -1783,7 +1785,7 @@ static int sst_donate_other (const char* method, "%s", method, addr, mysqld_unix_port, mysql_real_data_home, wsrep_defaults_file, - uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_domain_id, + uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_server.domain_id, binlog_opt_val, binlog_index_opt_val, bypass ? " " WSREP_SST_OPT_BYPASS : ""); diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index b8ce7eb42d0..006d412e6aa 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -233,7 +233,8 @@ static inline int wsrep_before_prepare(THD* thd, bool all) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); wsrep_xid_init(&thd->wsrep_xid, - thd->wsrep_trx().ws_meta().gtid()); + thd->wsrep_trx().ws_meta().gtid(), + wsrep_gtid_server.gtid()); } DBUG_RETURN(ret); } @@ -273,8 +274,33 @@ static inline int wsrep_before_commit(THD* thd, bool all) if ((ret= thd->wsrep_cs().before_commit()) == 0) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); + if (!thd->variables.gtid_seq_no && + (thd->wsrep_trx().ws_meta().flags() & wsrep::provider::flag::commit)) + { + uint64 seqno= 0; + if (thd->variables.wsrep_gtid_seq_no && + thd->variables.wsrep_gtid_seq_no > wsrep_gtid_server.seqno()) + { + seqno= thd->variables.wsrep_gtid_seq_no; + wsrep_gtid_server.seqno(thd->variables.wsrep_gtid_seq_no); + } + else + { + seqno= wsrep_gtid_server.seqno_inc(); + } + thd->variables.wsrep_gtid_seq_no= 0; + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + thd->variables.server_id= wsrep_gtid_server.server_id; + } + } + wsrep_xid_init(&thd->wsrep_xid, - thd->wsrep_trx().ws_meta().gtid()); + thd->wsrep_trx().ws_meta().gtid(), + wsrep_gtid_server.gtid()); wsrep_register_for_group_commit(thd); } DBUG_RETURN(ret); diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index 5f76f650b34..daf12f94fb9 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -37,7 +37,10 @@ int wsrep_init_vars() wsrep_node_name = my_strdup("", MYF(MY_WME)); wsrep_node_address = my_strdup("", MYF(MY_WME)); wsrep_node_incoming_address= my_strdup(WSREP_NODE_INCOMING_AUTO, MYF(MY_WME)); - wsrep_start_position = my_strdup(WSREP_START_POSITION_ZERO, MYF(MY_WME)); + if (wsrep_gtid_mode) + wsrep_start_position = my_strdup(WSREP_START_POSITION_ZERO_GTID, MYF(MY_WME)); + else + wsrep_start_position = my_strdup(WSREP_START_POSITION_ZERO, MYF(MY_WME)); return 0; } @@ -103,6 +106,13 @@ bool wsrep_sync_wait_update (sys_var* self, THD* thd, enum_var_type var_type) return false; } +template<typename T> +static T parse_value(char** startptr, char** endptr) +{ + T val= strtoll(*startptr, *&endptr, 10); + *startptr= *endptr; + return val; +} /* Verify the format of the given UUID:seqno. @@ -136,8 +146,25 @@ bool wsrep_start_position_verify (const char* start_str) return true; char* endptr; + char* startptr= (char *)start_str + uuid_len + 1; wsrep_seqno_t const seqno __attribute__((unused)) // to avoid GCC warnings - (strtoll(&start_str[uuid_len + 1], &endptr, 10)); + (parse_value<uint64_t>(&startptr, &endptr)); + + // Start parsing native GTID part + if (*startptr == ',') + { + startptr++; + uint32_t domain __attribute__((unused)) + (parse_value<uint32_t>(&startptr, &endptr)); + if (*endptr != '-') return true; + startptr++; + uint32_t server __attribute__((unused)) + (parse_value<uint32_t>(&startptr, &endptr)); + if (*endptr != '-') return true; + startptr++; + uint64_t seq __attribute__((unused)) + (parse_value<uint64_t>(&startptr, &endptr)); + } // Remaining string was seqno. if (*endptr == '\0') return false; @@ -150,9 +177,22 @@ static bool wsrep_set_local_position(THD* thd, const char* const value, size_t length, bool const sst) { + char* endptr; + char* startptr; wsrep_uuid_t uuid; size_t const uuid_len= wsrep_uuid_scan(value, length, &uuid); - wsrep_seqno_t const seqno= strtoll(value + uuid_len + 1, NULL, 10); + startptr= (char *)value + uuid_len + 1; + wsrep_seqno_t const seqno= parse_value<uint64_t>(&startptr, &endptr); + + if (*startptr == ',') + { + startptr++; + wsrep_gtid_server.domain_id= parse_value<uint32_t>(&startptr, &endptr); + startptr++; + wsrep_gtid_server.server_id= parse_value<uint32_t>(&startptr, &endptr); + startptr++; + wsrep_gtid_server.seqno(parse_value<uint64_t>(&startptr, &endptr)); + } if (sst) { wsrep_sst_received (thd, uuid, seqno, NULL, 0); @@ -436,6 +476,15 @@ bool wsrep_debug_update(sys_var *self, THD* thd, enum_var_type type) return false; } +bool +wsrep_gtid_seq_no_check(sys_var *self, THD *thd, set_var *var) +{ + ulonglong new_wsrep_gtid_seq_no= var->save_result.ulonglong_value; + if (wsrep_gtid_mode && new_wsrep_gtid_seq_no > wsrep_gtid_server.seqno()) + return false; + return true; +} + static int wsrep_cluster_address_verify (const char* cluster_address_str) { /* There is no predefined address format, it depends on provider. */ diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h index 481df02f2d5..810ed4f3dd7 100644 --- a/sql/wsrep_var.h +++ b/sql/wsrep_var.h @@ -20,9 +20,10 @@ #ifdef WITH_WSREP -#define WSREP_CLUSTER_NAME "my_wsrep_cluster" -#define WSREP_NODE_INCOMING_AUTO "AUTO" -#define WSREP_START_POSITION_ZERO "00000000-0000-0000-0000-000000000000:-1" +#define WSREP_CLUSTER_NAME "my_wsrep_cluster" +#define WSREP_NODE_INCOMING_AUTO "AUTO" +#define WSREP_START_POSITION_ZERO "00000000-0000-0000-0000-000000000000:-1" +#define WSREP_START_POSITION_ZERO_GTID "00000000-0000-0000-0000-000000000000:-1,0-0-0" // MySQL variables funcs @@ -102,6 +103,8 @@ extern bool wsrep_reject_queries_update UPDATE_ARGS; extern bool wsrep_debug_update UPDATE_ARGS; +extern bool wsrep_gtid_seq_no_check CHECK_ARGS; + #else /* WITH_WSREP */ #define wsrep_provider_init(X) diff --git a/sql/wsrep_xid.cc b/sql/wsrep_xid.cc index d8f6e013820..c84071e13d0 100644 --- a/sql/wsrep_xid.cc +++ b/sql/wsrep_xid.cc @@ -33,20 +33,24 @@ #define WSREP_XID_VERSION_OFFSET WSREP_XID_PREFIX_LEN #define WSREP_XID_VERSION_1 'd' #define WSREP_XID_VERSION_2 'e' +#define WSREP_XID_VERSION_3 'f' #define WSREP_XID_UUID_OFFSET 8 #define WSREP_XID_SEQNO_OFFSET (WSREP_XID_UUID_OFFSET + sizeof(wsrep_uuid_t)) -#define WSREP_XID_GTRID_LEN (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) +#define WSREP_XID_GTRID_LEN_V_1_2 (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) +#define WSREP_XID_RPL_GTID_OFFSET (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) +#define WSREP_XID_GTRID_LEN_V_3 (WSREP_XID_RPL_GTID_OFFSET + sizeof(wsrep_server_gtid_t)) -void wsrep_xid_init(XID* xid, const wsrep::gtid& wsgtid) +void wsrep_xid_init(XID* xid, const wsrep::gtid& wsgtid, const wsrep_server_gtid_t& gtid) { xid->formatID= 1; - xid->gtrid_length= WSREP_XID_GTRID_LEN; + xid->gtrid_length= WSREP_XID_GTRID_LEN_V_3; xid->bqual_length= 0; memset(xid->data, 0, sizeof(xid->data)); memcpy(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN); - xid->data[WSREP_XID_VERSION_OFFSET]= WSREP_XID_VERSION_2; + xid->data[WSREP_XID_VERSION_OFFSET]= WSREP_XID_VERSION_3; memcpy(xid->data + WSREP_XID_UUID_OFFSET, wsgtid.id().data(),sizeof(wsrep::id)); int8store(xid->data + WSREP_XID_SEQNO_OFFSET, wsgtid.seqno().get()); + memcpy(xid->data + WSREP_XID_RPL_GTID_OFFSET, >id, sizeof(wsrep_server_gtid_t)); } extern "C" @@ -54,11 +58,13 @@ int wsrep_is_wsrep_xid(const void* xid_ptr) { const XID* xid= static_cast<const XID*>(xid_ptr); return (xid->formatID == 1 && - xid->gtrid_length == WSREP_XID_GTRID_LEN && xid->bqual_length == 0 && - !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN) && - (xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_1 || - xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_2)); + !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN) && + (((xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_1 || + xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_2) && + xid->gtrid_length == WSREP_XID_GTRID_LEN_V_1_2) || + (xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_3 && + xid->gtrid_length == WSREP_XID_GTRID_LEN_V_3))); } const unsigned char* wsrep_xid_uuid(const xid_t* xid) @@ -90,6 +96,7 @@ long long wsrep_xid_seqno(const xid_t* xid) memcpy(&ret, xid->data + WSREP_XID_SEQNO_OFFSET, sizeof ret); break; case WSREP_XID_VERSION_2: + case WSREP_XID_VERSION_3: ret= sint8korr(xid->data + WSREP_XID_SEQNO_OFFSET); break; default: @@ -127,10 +134,10 @@ bool wsrep_set_SE_checkpoint(XID& xid) &xid); } -bool wsrep_set_SE_checkpoint(const wsrep::gtid& wsgtid) +bool wsrep_set_SE_checkpoint(const wsrep::gtid& wsgtid, const wsrep_server_gtid_t& gtid) { XID xid; - wsrep_xid_init(&xid, wsgtid); + wsrep_xid_init(&xid, wsgtid, gtid); return wsrep_set_SE_checkpoint(xid); } @@ -158,30 +165,61 @@ bool wsrep_get_SE_checkpoint(XID& xid) &xid); } -wsrep::gtid wsrep_get_SE_checkpoint() +static bool wsrep_get_SE_checkpoint_common(XID& xid) { - XID xid; xid.null(); if (wsrep_get_SE_checkpoint(xid)) { - return wsrep::gtid(); + return FALSE; } if (xid.is_null()) { - return wsrep::gtid(); + return FALSE; } if (!wsrep_is_wsrep_xid(&xid)) { WSREP_WARN("Read non-wsrep XID from storage engines."); + return FALSE; + } + + return TRUE; +} + +template<> +wsrep::gtid wsrep_get_SE_checkpoint() +{ + XID xid; + + if (!wsrep_get_SE_checkpoint_common(xid)) + { return wsrep::gtid(); } return wsrep::gtid(wsrep_xid_uuid(xid),wsrep_xid_seqno(xid)); } +template<> +wsrep_server_gtid_t wsrep_get_SE_checkpoint() +{ + XID xid; + wsrep_server_gtid_t gtid= {0,0,0}; + + if (!wsrep_get_SE_checkpoint_common(xid)) + { + return gtid; + } + + if (xid.data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_3) + { + memcpy(>id, &xid.data[WSREP_XID_RPL_GTID_OFFSET], sizeof(wsrep_server_gtid_t)); + } + + return gtid; +} + /* Sort order for XIDs. Wsrep XIDs are sorted according to seqno in ascending order. Non-wsrep XIDs are considered diff --git a/sql/wsrep_xid.h b/sql/wsrep_xid.h index a1b9afc1817..45ba6ffee6b 100644 --- a/sql/wsrep_xid.h +++ b/sql/wsrep_xid.h @@ -20,15 +20,16 @@ #ifdef WITH_WSREP +#include "wsrep_mysqld.h" #include "wsrep/gtid.hpp" #include "handler.h" // XID typedef -void wsrep_xid_init(xid_t*, const wsrep::gtid&); +void wsrep_xid_init(xid_t*, const wsrep::gtid&, const wsrep_server_gtid_t&); const wsrep::id& wsrep_xid_uuid(const XID&); wsrep::seqno wsrep_xid_seqno(const XID&); -wsrep::gtid wsrep_get_SE_checkpoint(); -bool wsrep_set_SE_checkpoint(const wsrep::gtid& gtid); +template<typename T> T wsrep_get_SE_checkpoint(); +bool wsrep_set_SE_checkpoint(const wsrep::gtid& gtid, const wsrep_server_gtid_t&); //void wsrep_get_SE_checkpoint(XID&); /* uncomment if needed */ //void wsrep_set_SE_checkpoint(XID&); /* uncomment if needed */ |