diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 2 | ||||
-rw-r--r-- | sql/item_strfunc.cc | 118 | ||||
-rw-r--r-- | sql/log.cc | 127 | ||||
-rw-r--r-- | sql/log.h | 7 | ||||
-rw-r--r-- | sql/mysqld.cc | 2 | ||||
-rw-r--r-- | sql/service_wsrep.cc | 24 | ||||
-rw-r--r-- | sql/sql_class.cc | 4 | ||||
-rw-r--r-- | sql/sql_class.h | 9 | ||||
-rw-r--r-- | sql/sys_vars.cc | 10 | ||||
-rw-r--r-- | sql/wsrep_applier.cc | 40 | ||||
-rw-r--r-- | sql/wsrep_high_priority_service.cc | 5 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 174 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 125 | ||||
-rw-r--r-- | sql/wsrep_server_service.cc | 12 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 18 | ||||
-rw-r--r-- | sql/wsrep_trans_observer.h | 30 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 55 | ||||
-rw-r--r-- | sql/wsrep_var.h | 9 | ||||
-rw-r--r-- | sql/wsrep_xid.cc | 66 | ||||
-rw-r--r-- | sql/wsrep_xid.h | 7 |
20 files changed, 602 insertions, 242 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index ac6b4f3f453..7d61252eea6 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2057,7 +2057,7 @@ static my_xid wsrep_order_and_check_continuity(XID *list, int len) { #ifdef WITH_WSREP wsrep_sort_xid_array(list, len); - wsrep::gtid cur_position= wsrep_get_SE_checkpoint(); + wsrep::gtid cur_position= wsrep_get_SE_checkpoint<wsrep::gtid>(); long long cur_seqno= cur_position.seqno().get(); for (int i= 0; i < len; ++i) { diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc index e689a71e431..82259cd6924 100644 --- a/sql/item_strfunc.cc +++ b/sql/item_strfunc.cc @@ -5234,28 +5234,33 @@ String *Item_temptable_rowid::val_str(String *str) str_value.set((char*)(table->file->ref), max_length, &my_charset_bin); return &str_value; } + #ifdef WITH_WSREP #include "wsrep_mysqld.h" +/* Format is %d-%d-%llu */ +#define WSREP_MAX_WSREP_SERVER_GTID_STR_LEN 10+1+10+1+20 String *Item_func_wsrep_last_written_gtid::val_str_ascii(String *str) { - wsrep::gtid gtid= current_thd->wsrep_cs().last_written_gtid(); - if (gtid_str.alloc(wsrep::gtid_c_str_len())) + if (gtid_str.alloc(WSREP_MAX_WSREP_SERVER_GTID_STR_LEN+1)) { - my_error(ER_OUTOFMEMORY, wsrep::gtid_c_str_len()); - null_value= true; - return NULL; + my_error(ER_OUTOFMEMORY, WSREP_MAX_WSREP_SERVER_GTID_STR_LEN); + null_value= TRUE; + return 0; } - ssize_t gtid_len= gtid_print_to_c_str(gtid, (char*) gtid_str.ptr(), - wsrep::gtid_c_str_len()); + ssize_t gtid_len= my_snprintf((char*)gtid_str.ptr(), + WSREP_MAX_WSREP_SERVER_GTID_STR_LEN+1, + "%u-%u-%llu", wsrep_gtid_server.domain_id, + wsrep_gtid_server.server_id, + current_thd->wsrep_last_written_gtid_seqno); if (gtid_len < 0) { my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0), func_name(), - "wsrep_gtid_print failed"); - null_value= true; - return NULL; + "wsrep_gtid_print failed"); + null_value= TRUE; + return 0; } gtid_str.length(gtid_len); return >id_str; @@ -5263,23 +5268,23 @@ String *Item_func_wsrep_last_written_gtid::val_str_ascii(String *str) String *Item_func_wsrep_last_seen_gtid::val_str_ascii(String *str) { - /* TODO: Should call Wsrep_server_state.instance().last_committed_gtid() - instead. */ - wsrep::gtid gtid= Wsrep_server_state::instance().provider().last_committed_gtid(); - if (gtid_str.alloc(wsrep::gtid_c_str_len())) + if (gtid_str.alloc(WSREP_MAX_WSREP_SERVER_GTID_STR_LEN+1)) { - my_error(ER_OUTOFMEMORY, wsrep::gtid_c_str_len()); - null_value= true; - return NULL; + my_error(ER_OUTOFMEMORY, WSREP_MAX_WSREP_SERVER_GTID_STR_LEN); + null_value= TRUE; + return 0; } - ssize_t gtid_len= wsrep::gtid_print_to_c_str(gtid, (char*) gtid_str.ptr(), - wsrep::gtid_c_str_len()); + ssize_t gtid_len= my_snprintf((char*)gtid_str.ptr(), + WSREP_MAX_WSREP_SERVER_GTID_STR_LEN+1, + "%u-%u-%llu", wsrep_gtid_server.domain_id, + wsrep_gtid_server.server_id, + wsrep_gtid_server.seqno()); if (gtid_len < 0) { my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0), func_name(), "wsrep_gtid_print failed"); - null_value= true; - return NULL; + null_value= TRUE; + return 0; } gtid_str.length(gtid_len); return >id_str; @@ -5287,49 +5292,52 @@ String *Item_func_wsrep_last_seen_gtid::val_str_ascii(String *str) longlong Item_func_wsrep_sync_wait_upto::val_int() { - int timeout= -1; - String* gtid_str= args[0]->val_str(&value); - if (gtid_str == NULL) - { - my_error(ER_WRONG_ARGUMENTS, MYF(0), func_name()); - return 0LL; - } - - if (arg_count == 2) - { - timeout= args[1]->val_int(); - } + String *gtid_str __attribute__((unused)) = args[0]->val_str(&value); + null_value=0; + uint timeout; + rpl_gtid *gtid_list; + uint32 count; + int ret= 1; - wsrep_gtid_t gtid; - int gtid_len= wsrep_gtid_scan(gtid_str->ptr(), gtid_str->length(), >id); - if (gtid_len < 0) + if (args[0]->null_value) { my_error(ER_WRONG_ARGUMENTS, MYF(0), func_name()); - return 0LL; + null_value= TRUE; + return 0; } - if (gtid.seqno == WSREP_SEQNO_UNDEFINED && - wsrep_uuid_compare(>id.uuid, &WSREP_UUID_UNDEFINED) == 0) + if (arg_count==2 && !args[1]->null_value) + timeout= (uint)(args[1]->val_real()); + else + timeout= (uint)-1; + + if (!(gtid_list= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(), + &count))) { - return 1LL; + my_error(ER_INCORRECT_GTID_STATE, MYF(0), func_name()); + null_value= TRUE; + return 0; } - - enum wsrep::provider::status status= - wsrep_sync_wait_upto(current_thd, >id, timeout); - - if (status) + if (count == 1) { - int err; - switch (status) { - case wsrep::provider::error_transaction_missing: - err= ER_WRONG_ARGUMENTS; - break; - default: - err= ER_LOCK_WAIT_TIMEOUT; + if (wsrep_check_gtid_seqno(gtid_list[0].domain_id, gtid_list[0].server_id, + gtid_list[0].seq_no)) + { + if (wsrep_gtid_server.wait_gtid_upto(gtid_list[0].seq_no, timeout)) + { + my_error(ER_LOCK_WAIT_TIMEOUT, MYF(0), func_name()); + ret= 0; + } } - my_error(err, MYF(0), func_name()); - return 0LL; } - return 1LL; + else + { + my_error(ER_WRONG_ARGUMENTS, MYF(0), func_name()); + null_value= TRUE; + ret= 0; + } + my_free(gtid_list); + return ret; } + #endif /* WITH_WSREP */ diff --git a/sql/log.cc b/sql/log.cc index bf19946210a..93b5d697d14 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -5048,55 +5048,6 @@ MYSQL_BIN_LOG::is_xidlist_idle_nolock() return true; } -#ifdef WITH_WSREP -inline bool -is_gtid_cached_internal(IO_CACHE *file) -{ - uchar data[EVENT_TYPE_OFFSET+1]; - bool result= false; - my_off_t write_pos= my_b_tell(file); - if (reinit_io_cache(file, READ_CACHE, 0, 0, 0)) - return false; - /* - In the cache we have gtid event if , below condition is true, - */ - my_b_read(file, data, sizeof(data)); - uint event_type= (uchar)data[EVENT_TYPE_OFFSET]; - if (event_type == GTID_LOG_EVENT) - result= true; - /* - Cleanup , Why because we have not read the full buffer - and this will cause next to next reinit_io_cache(called in write_cache) - to make cache empty. - */ - file->read_pos= file->read_end; - if (reinit_io_cache(file, WRITE_CACHE, write_pos, 0, 0)) - return false; - return result; -} -#endif - -#ifdef WITH_WSREP -inline bool -MYSQL_BIN_LOG::is_gtid_cached(THD *thd) -{ - binlog_cache_mngr *mngr= (binlog_cache_mngr *) thd_get_ha_data( - thd, binlog_hton); - if (!mngr) - return false; - binlog_cache_data *cache_trans= mngr->get_binlog_cache_data( - use_trans_cache(thd, true)); - binlog_cache_data *cache_stmt= mngr->get_binlog_cache_data( - use_trans_cache(thd, false)); - if (cache_trans && !cache_trans->empty() && - is_gtid_cached_internal(&cache_trans->cache_log)) - return true; - if (cache_stmt && !cache_stmt->empty() && - is_gtid_cached_internal(&cache_stmt->cache_log)) - return true; - return false; -} -#endif /** Create a new log file name. @@ -5719,31 +5670,47 @@ THD::binlog_start_trans_and_stmt() { DBUG_VOID_RETURN; } - /* Write Gtid - Get domain id only when gtid mode is set - If this event is replicate through a master then , - we will forward the same gtid another nodes - We have to do this only one time in mysql transaction. - Since this function is called multiple times , We will check for - ha_info->is_started() - */ + /* If this event replicates through a master-slave then we need to + inject manually GTID so it is preserved in the cluster. We are writing + directly to WSREP buffer and not in IO cache because in case of IO cache + GTID event will be duplicated in binlog. + We have to do this only one time in mysql transaction. + Since this function is called multiple times , We will check for + ha_info->is_started(). + */ Ha_trx_info *ha_info; ha_info= this->ha_data[binlog_hton->slot].ha_info + (mstmt_mode ? 1 : 0); - if (!ha_info->is_started() && wsrep_gtid_mode - && this->variables.gtid_seq_no) + if (!ha_info->is_started() && + (this->variables.gtid_seq_no || this->variables.wsrep_gtid_seq_no) && + wsrep_on(this) && + (this->wsrep_cs().mode() == wsrep::client_state::m_local)) { - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(1); - IO_CACHE *file= &cache_data->cache_log; - Log_event_writer writer(file, cache_data); - Gtid_log_event gtid_event(this, this->variables.gtid_seq_no, - this->variables.gtid_domain_id, - true, LOG_EVENT_SUPPRESS_USE_F, - true, 0); - gtid_event.server_id= this->variables.server_id; - writer.write(>id_event); + uchar *buf= 0; + size_t len= 0; + IO_CACHE tmp_io_cache; + Log_event_writer writer(&tmp_io_cache, 0); + if(!open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, + 128, MYF(MY_WME))) + { + uint64 seqno= this->variables.gtid_seq_no; + uint32 domain_id= this->variables.gtid_domain_id; + uint32 server_id= this->variables.server_id; + if (!this->variables.gtid_seq_no && this->variables.wsrep_gtid_seq_no) + { + seqno= this->variables.wsrep_gtid_seq_no; + domain_id= wsrep_gtid_server.domain_id; + server_id= wsrep_gtid_server.server_id; + } + Gtid_log_event gtid_event(this, seqno, domain_id, true, + LOG_EVENT_SUPPRESS_USE_F, true, 0); + gtid_event.server_id= server_id; + writer.write(>id_event); + wsrep_write_cache_buf(&tmp_io_cache, &buf, &len); + if (len > 0) this->wsrep_cs().append_data(wsrep::const_buffer(buf, len)); + if (buf) my_free(buf); + close_cached_file(&tmp_io_cache); + } } #endif if (mstmt_mode) @@ -6024,20 +5991,9 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_ENTER("write_gtid_event"); DBUG_PRINT("enter", ("standalone: %d", standalone)); -#ifdef WITH_WSREP - if (WSREP(thd) && - (wsrep_thd_trx_seqno(thd) > 0) && - wsrep_gtid_mode && !thd->variables.gtid_seq_no) - { - domain_id= wsrep_gtid_domain_id; - } else { -#endif /* WITH_WSREP */ + seq_no= thd->variables.gtid_seq_no; domain_id= thd->variables.gtid_domain_id; -#ifdef WITH_WSREP - } -#endif /* WITH_WSREP */ local_server_id= thd->variables.server_id; - seq_no= thd->variables.gtid_seq_no; DBUG_ASSERT(local_server_id != 0); @@ -6084,8 +6040,11 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_ASSERT(this == &mysql_bin_log); #ifdef WITH_WSREP - if (wsrep_gtid_mode && is_gtid_cached(thd)) - DBUG_RETURN(false); + if (wsrep_gtid_mode) + { + thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + thd->variables.server_id= global_system_variables.server_id; + } #endif if (write_event(>id_event)) diff --git a/sql/log.h b/sql/log.h index bf1dbd30c6c..7ccfd606f21 100644 --- a/sql/log.h +++ b/sql/log.h @@ -561,13 +561,6 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG bool write_transaction_to_binlog_events(group_commit_entry *entry); void trx_group_commit_leader(group_commit_entry *leader); bool is_xidlist_idle_nolock(); -#ifdef WITH_WSREP - /* - When this mariadb node is slave and galera enabled. So in this case - we write the gtid in wsrep_run_commit itself. - */ - inline bool is_gtid_cached(THD *thd); -#endif public: /* A list of struct xid_count_per_binlog is used to keep track of how many diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 7fe98c756cd..e2489f7706f 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -5724,6 +5724,8 @@ int mysqld_main(int argc, char **argv) { wsrep_shutdown_replication(); } + /* Release threads if they are waiting in WSREP_SYNC_WAIT_UPTO_GTID */ + wsrep_gtid_server.signal_waiters(0, true); #endif close_connections(); diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc index 36ea311f59f..68de55b5778 100644 --- a/sql/service_wsrep.cc +++ b/sql/service_wsrep.cc @@ -281,16 +281,34 @@ extern "C" int wsrep_thd_append_key(THD *thd, } ret= client_state.append_key(wsrep_key); } + /* + In case of `wsrep_gtid_mode` when WS will be replicated, we need to set + `server_id` for events that are going to be written in IO, and in case of + manual SET gtid_seq_no=X we are ignoring value. + */ + if (!ret && wsrep_gtid_mode && !thd->slave_thread && !wsrep_thd_is_applying(thd)) + { + thd->variables.server_id= wsrep_gtid_server.server_id; + thd->variables.gtid_seq_no= 0; + } return ret; } extern "C" void wsrep_commit_ordered(THD *thd) { if (wsrep_is_active(thd) && - thd->wsrep_trx().state() == wsrep::transaction::s_committing && - !wsrep_commit_will_write_binlog(thd)) + (thd->wsrep_trx().state() == wsrep::transaction::s_committing || + thd->wsrep_trx().state() == wsrep::transaction::s_ordered_commit)) { - thd->wsrep_cs().ordered_commit(); + wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false); + if (wsrep_thd_is_local(thd)) + { + thd->wsrep_last_written_gtid_seqno= thd->wsrep_current_gtid_seqno; + } + if (!wsrep_commit_will_write_binlog(thd)) + { + thd->wsrep_cs().ordered_commit(); + } } } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 1fdf4f17447..b5054557c46 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -697,9 +697,10 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) wsrep_apply_format(0), wsrep_rbr_buf(NULL), wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED), + wsrep_last_written_gtid_seqno(0), + wsrep_current_gtid_seqno(0), wsrep_affected_rows(0), wsrep_has_ignored_error(false), - wsrep_replicate_GTID(false), wsrep_ignore_table(false), /* wsrep-lib */ @@ -1313,7 +1314,6 @@ void THD::init() wsrep_rbr_buf = NULL; wsrep_affected_rows = 0; m_wsrep_next_trx_id = WSREP_UNDEFINED_TRX_ID; - wsrep_replicate_GTID = false; #endif /* WITH_WSREP */ if (variables.sql_log_bin) diff --git a/sql/sql_class.h b/sql/sql_class.h index 5bbe2e386d0..0891cd214f8 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -629,6 +629,7 @@ typedef struct system_variables are based on the cluster size): */ ulong saved_auto_increment_increment, saved_auto_increment_offset; + ulonglong wsrep_gtid_seq_no; #endif /* WITH_WSREP */ uint eq_range_index_dive_limit; ulong column_compression_zlib_strategy; @@ -4875,17 +4876,13 @@ public: size_t wsrep_TOI_pre_query_len; wsrep_po_handle_t wsrep_po_handle; size_t wsrep_po_cnt; -#ifdef GTID_SUPPORT - my_bool wsrep_po_in_trans; - rpl_sid wsrep_po_sid; -#endif /* GTID_SUPPORT */ void *wsrep_apply_format; uchar* wsrep_rbr_buf; wsrep_gtid_t wsrep_sync_wait_gtid; - // wsrep_gtid_t wsrep_last_written_gtid; + uint64 wsrep_last_written_gtid_seqno; + uint64 wsrep_current_gtid_seqno; ulong wsrep_affected_rows; bool wsrep_has_ignored_error; - bool wsrep_replicate_GTID; /* When enabled, do not replicate/binlog updates from the current table that's diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index de476042a80..ab382304973 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -5792,9 +5792,17 @@ static Sys_var_uint Sys_wsrep_gtid_domain_id( "wsrep_gtid_domain_id", "When wsrep_gtid_mode is set, this value is " "used as gtid_domain_id for galera transactions and also copied to the " "joiner nodes during state transfer. It is ignored, otherwise.", - GLOBAL_VAR(wsrep_gtid_domain_id), CMD_LINE(REQUIRED_ARG), + GLOBAL_VAR(wsrep_gtid_server.domain_id), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, UINT_MAX32), DEFAULT(0), BLOCK_SIZE(1)); +static Sys_var_ulonglong Sys_wsrep_gtid_seq_no( + "wsrep_gtid_seq_no", + "Internal server usage, manually set WSREP GTID seqno.", + SESSION_ONLY(wsrep_gtid_seq_no), + NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0), + BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(wsrep_gtid_seq_no_check)); + static Sys_var_mybool Sys_wsrep_gtid_mode( "wsrep_gtid_mode", "Automatically update the (joiner) node's " "wsrep_gtid_domain_id value with that of donor's (received during " diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index 1ab65df1ca3..25a4e22aeb4 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -131,6 +131,12 @@ int wsrep_apply_events(THD* thd, if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", (long long) wsrep_thd_trx_seqno(thd)); + thd->variables.gtid_seq_no= 0; + if (wsrep_gtid_mode) + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + else + thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + while (buf_len) { int exec_res; @@ -150,22 +156,38 @@ int wsrep_apply_events(THD* thd, case FORMAT_DESCRIPTION_EVENT: wsrep_set_apply_format(thd, (Format_description_log_event*)ev); continue; -#ifdef GTID_SUPPORT - case GTID_LOG_EVENT: - { - Gtid_log_event* gev= (Gtid_log_event*)ev; - if (gev->get_gno() == 0) + case GTID_EVENT: { - /* Skip GTID log event to make binlog to generate LTID on commit */ + Gtid_log_event *gtid_ev= (Gtid_log_event*)ev; + thd->variables.server_id= gtid_ev->server_id; + thd->variables.gtid_domain_id= gtid_ev->domain_id; + if ((gtid_ev->server_id == wsrep_gtid_server.server_id) && + (gtid_ev->domain_id == wsrep_gtid_server.domain_id)) + { + thd->variables.wsrep_gtid_seq_no= gtid_ev->seq_no; + } + else + { + thd->variables.gtid_seq_no= gtid_ev->seq_no; + } delete ev; - continue; } - } -#endif /* GTID_SUPPORT */ + continue; default: break; } + + if (!thd->variables.gtid_seq_no && wsrep_thd_is_toi(thd) && + (ev->get_type_code() == QUERY_EVENT)) + { + uint64 seqno= wsrep_gtid_server.seqno_inc(); + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + } + } /* Use the original server id for logging. */ thd->set_server_id(ev->server_id); thd->set_time(); // time the query diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index 06d398baf5f..d73b9cb09ce 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -396,7 +396,8 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, thd->close_temporary_tables(); thd->lex->sql_command= SQLCOM_END; - wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); + wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false); + wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid()); must_exit_= check_exit_status(); @@ -448,7 +449,7 @@ int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_ cs.before_rollback(); cs.after_rollback(); } - wsrep_set_SE_checkpoint(ws_meta.gtid()); + wsrep_set_SE_checkpoint(ws_meta.gtid(), wsrep_gtid_server.gtid()); ret= ret || cs.provider().commit_order_leave(ws_handle, ws_meta, err); cs.after_applying(); } diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 444a187ea57..4c1d683b03e 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -52,11 +52,7 @@ /* wsrep-lib */ Wsrep_server_state* Wsrep_server_state::m_instance; -my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface -#ifdef GTID_SUPPORT -/* Sidno in global_sid_map corresponding to group uuid */ -rpl_sidno wsrep_sidno= -1; -#endif /* GTID_SUPPORT */ +my_bool wsrep_emulate_bin_log= FALSE; // activating parts of binlog interface my_bool wsrep_preordered_opt= FALSE; /* Streaming Replication */ @@ -106,10 +102,9 @@ ulong wsrep_max_ws_size; // Max allowed ws (RBR buffer) s ulong wsrep_max_ws_rows; // Max number of rows in ws ulong wsrep_forced_binlog_format; ulong wsrep_mysql_replication_bundle; -bool wsrep_gtid_mode; // Use wsrep_gtid_domain_id - // for galera transactions? -uint32 wsrep_gtid_domain_id; // gtid_domain_id for galera - // transactions + +bool wsrep_gtid_mode; // Enable WSREP native GTID support +Wsrep_gtid_server wsrep_gtid_server; /* Other configuration variables and their default values. */ my_bool wsrep_incremental_data_collection= 0; // Incremental data collection @@ -144,6 +139,7 @@ mysql_mutex_t LOCK_wsrep_replaying; mysql_cond_t COND_wsrep_replaying; mysql_mutex_t LOCK_wsrep_slave_threads; mysql_cond_t COND_wsrep_slave_threads; +mysql_mutex_t LOCK_wsrep_gtid_wait_upto; mysql_mutex_t LOCK_wsrep_cluster_config; mysql_mutex_t LOCK_wsrep_desync; mysql_mutex_t LOCK_wsrep_config_state; @@ -167,7 +163,8 @@ ulong my_bind_addr; PSI_mutex_key key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init, - key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync, + key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_gtid_wait_upto, + key_LOCK_wsrep_desync, key_LOCK_wsrep_config_state, key_LOCK_wsrep_cluster_config, key_LOCK_wsrep_group_commit, key_LOCK_wsrep_SR_pool, @@ -179,7 +176,7 @@ PSI_mutex_key PSI_cond_key key_COND_wsrep_thd, key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread, - key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads, + key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads, key_COND_wsrep_gtid_wait_upto, key_COND_wsrep_joiner_monitor, key_COND_wsrep_donor_monitor; PSI_file_key key_file_wsrep_gra_log; @@ -193,6 +190,7 @@ static PSI_mutex_info wsrep_mutexes[]= { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_gtid_wait_upto, "LOCK_wsrep_gtid_wait_upto", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_cluster_config, "LOCK_wsrep_cluster_config", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}, @@ -212,6 +210,7 @@ static PSI_cond_info wsrep_conds[]= { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0}, { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_gtid_wait_upto, "COND_wsrep_gtid_wait_upto", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_joiner_monitor, "COND_wsrep_joiner_monitor", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_donor_monitor, "COND_wsrep_donor_monitor", PSI_FLAG_GLOBAL} }; @@ -302,6 +301,58 @@ static void wsrep_log_cb(wsrep::log::level level, const char *msg) } } +void wsrep_init_gtid() +{ + wsrep_server_gtid_t stored_gtid= wsrep_get_SE_checkpoint<wsrep_server_gtid_t>(); + if (stored_gtid.server_id == 0) + { + rpl_gtid wsrep_last_gtid; + stored_gtid.domain_id= wsrep_gtid_server.domain_id; + if (mysql_bin_log.is_open() && + mysql_bin_log.lookup_domain_in_binlog_state(stored_gtid.domain_id, + &wsrep_last_gtid)) + { + stored_gtid.server_id= wsrep_last_gtid.server_id; + stored_gtid.seqno= wsrep_last_gtid.seq_no; + } + else + { + stored_gtid.server_id= global_system_variables.server_id; + stored_gtid.seqno= 0; + } + } + wsrep_gtid_server.gtid(stored_gtid); +} + +bool wsrep_get_binlog_gtid_seqno(wsrep_server_gtid_t& gtid) +{ + rpl_gtid binlog_gtid; + int ret= 0; + if (mysql_bin_log.is_open() && + mysql_bin_log.find_in_binlog_state(gtid.domain_id, + gtid.server_id, + &binlog_gtid)) + { + gtid.domain_id= binlog_gtid.domain_id; + gtid.server_id= binlog_gtid.server_id; + gtid.seqno= binlog_gtid.seq_no; + ret= 1; + } + return ret; +} + +bool wsrep_check_gtid_seqno(const uint32& domain, const uint32& server, + uint64& seqno) +{ + if (domain == wsrep_gtid_server.domain_id && + server == wsrep_gtid_server.server_id) + { + if (wsrep_gtid_server.seqno_committed() < seqno) return 1; + return 0; + } + return 0; +} + void wsrep_init_sidno(const wsrep::id& uuid) { /* @@ -692,6 +743,16 @@ int wsrep_init_server() void wsrep_init_globals() { wsrep_init_sidno(Wsrep_server_state::instance().connected_gtid().id()); + wsrep_init_gtid(); + /* Recover last written wsrep gtid */ + if (wsrep_new_cluster) + { + wsrep_server_gtid_t gtid= {wsrep_gtid_server.domain_id, + wsrep_gtid_server.server_id, 0}; + wsrep_get_binlog_gtid_seqno(gtid); + wsrep_gtid_server.seqno(gtid.seqno); + } + wsrep_new_cluster= 0; wsrep_init_schema(); if (WSREP_ON) { @@ -793,6 +854,7 @@ void wsrep_thr_init() mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_slave_threads, &COND_wsrep_slave_threads, NULL); + mysql_mutex_init(key_LOCK_wsrep_gtid_wait_upto, &LOCK_wsrep_gtid_wait_upto, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_cluster_config, &LOCK_wsrep_cluster_config, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); @@ -903,6 +965,7 @@ void wsrep_thr_deinit() mysql_cond_destroy(&COND_wsrep_sst_init); mysql_mutex_destroy(&LOCK_wsrep_replaying); mysql_cond_destroy(&COND_wsrep_replaying); + mysql_mutex_destroy(&LOCK_wsrep_gtid_wait_upto); mysql_mutex_destroy(&LOCK_wsrep_slave_threads); mysql_cond_destroy(&COND_wsrep_slave_threads); mysql_mutex_destroy(&LOCK_wsrep_cluster_config); @@ -939,10 +1002,20 @@ void wsrep_recover() uuid_str, (long long)local_seqno); return; } - wsrep::gtid gtid= wsrep_get_SE_checkpoint(); + wsrep::gtid gtid= wsrep_get_SE_checkpoint<wsrep::gtid>(); std::ostringstream oss; oss << gtid; - WSREP_INFO("Recovered position: %s", oss.str().c_str()); + if (wsrep_gtid_mode) + { + wsrep_server_gtid_t server_gtid= wsrep_get_SE_checkpoint<wsrep_server_gtid_t>(); + WSREP_INFO("Recovered position: %s,%d-%d-%llu", oss.str().c_str(), server_gtid.domain_id, + server_gtid.server_id, server_gtid.seqno); + } + else + { + WSREP_INFO("Recovered position: %s", oss.str().c_str()); + } + } @@ -1012,7 +1085,6 @@ bool wsrep_start_replication() } bool const bootstrap(TRUE == wsrep_new_cluster); - wsrep_new_cluster= FALSE; WSREP_INFO("Start replication"); @@ -1445,16 +1517,36 @@ int wsrep_to_buf_helper( if (!ret && writer.write(>id_ev)) ret= 1; } #endif /* GTID_SUPPORT */ - if (wsrep_gtid_mode && thd->variables.gtid_seq_no) + /* + * Check if this is applier thread, slave_thread or + * we have set manually WSREP GTID seqno. Add GTID event. + */ + if (thd->slave_thread || wsrep_thd_is_applying(thd) || + thd->variables.wsrep_gtid_seq_no) { - Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no, - thd->variables.gtid_domain_id, - true, LOG_EVENT_SUPPRESS_USE_F, - true, 0); - gtid_event.server_id= thd->variables.server_id; + uint64 seqno= thd->variables.gtid_seq_no; + uint32 domain_id= thd->variables.gtid_domain_id; + uint32 server_id= thd->variables.server_id; + if (!thd->variables.gtid_seq_no && thd->variables.wsrep_gtid_seq_no) + { + seqno= thd->variables.wsrep_gtid_seq_no; + domain_id= wsrep_gtid_server.domain_id; + server_id= wsrep_gtid_server.server_id; + } + Gtid_log_event gtid_event(thd, seqno, domain_id, true, + LOG_EVENT_SUPPRESS_USE_F, true, 0); + gtid_event.server_id= server_id; if (!gtid_event.is_valid()) ret= 0; ret= writer.write(>id_event); } + /* + It's local DDL so in case of possible gtid seqno (SET gtid_seq_no=X) + manipulation, seqno value will be ignored. + */ + else + { + thd->variables.gtid_seq_no= 0; + } /* if there is prepare query, add event for it */ if (!ret && thd->wsrep_TOI_pre_query) @@ -1468,6 +1560,9 @@ int wsrep_to_buf_helper( /* continue to append the actual query */ Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); + /* WSREP GTID mode, we need to change server_id */ + if (wsrep_gtid_mode && !thd->variables.gtid_seq_no) + ev.server_id= wsrep_gtid_server.server_id; ev.checksum_alg= current_binlog_check_alg; if (!ret && writer.write(&ev)) ret= 1; if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; @@ -1947,6 +2042,28 @@ static int wsrep_TOI_begin(THD *thd, const char *db, const char *table, rc= -1; } else { + if (!thd->variables.gtid_seq_no) + { + uint64 seqno= 0; + if (thd->variables.wsrep_gtid_seq_no && + thd->variables.wsrep_gtid_seq_no > wsrep_gtid_server.seqno()) + { + seqno= thd->variables.wsrep_gtid_seq_no; + wsrep_gtid_server.seqno(thd->variables.wsrep_gtid_seq_no); + } + else + { + seqno= wsrep_gtid_server.seqno_inc(); + } + thd->variables.wsrep_gtid_seq_no= 0; + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + thd->variables.server_id= wsrep_gtid_server.server_id; + } + } ++wsrep_to_isolation; rc= 0; } @@ -1965,15 +2082,22 @@ static void wsrep_TOI_end(THD *thd) { WSREP_DEBUG("TO END: %lld: %s", client_state.toi_meta().seqno().get(), WSREP_QUERY(thd)); + wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false); + if (wsrep_thd_is_local_toi(thd)) { - wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); wsrep::mutable_buffer err; + + thd->wsrep_last_written_gtid_seqno= thd->wsrep_current_gtid_seqno; + wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid()); + if (thd->is_error() && !wsrep_must_ignore_error(thd)) { - wsrep_store_error(thd, err); + wsrep_store_error(thd, err); } + int const ret= client_state.leave_toi_local(err); + if (!ret) { WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get()); @@ -2666,12 +2790,6 @@ void* start_wsrep_THD(void *arg) statistic_increment(thread_created, &LOCK_status); - if (wsrep_gtid_mode) - { - /* Adjust domain_id. */ - thd->variables.gtid_domain_id= wsrep_gtid_domain_id; - } - thd->real_id=pthread_self(); // Keep purify happy my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 5ad68fbe423..222b265248f 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -38,6 +38,7 @@ typedef struct st_mysql_show_var SHOW_VAR; #include "wsrep/streaming_context.hpp" #include "wsrep_api.h" #include <vector> +#include <map> #include "wsrep_server_state.h" #define WSREP_UNDEFINED_TRX_ID ULONGLONG_MAX @@ -97,7 +98,6 @@ extern ulong wsrep_running_applier_threads; extern ulong wsrep_running_rollbacker_threads; extern bool wsrep_new_cluster; extern bool wsrep_gtid_mode; -extern uint32 wsrep_gtid_domain_id; enum enum_wsrep_reject_types { WSREP_REJECT_NONE, /* nothing rejected */ @@ -303,6 +303,7 @@ extern mysql_mutex_t LOCK_wsrep_replaying; extern mysql_cond_t COND_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_cond_t COND_wsrep_slave_threads; +extern mysql_mutex_t LOCK_wsrep_gtid_wait_upto; extern mysql_mutex_t LOCK_wsrep_cluster_config; extern mysql_mutex_t LOCK_wsrep_desync; extern mysql_mutex_t LOCK_wsrep_SR_pool; @@ -316,9 +317,6 @@ extern mysql_cond_t COND_wsrep_donor_monitor; extern my_bool wsrep_emulate_bin_log; extern int wsrep_to_isolation; -#ifdef GTID_SUPPORT -extern rpl_sidno wsrep_sidno; -#endif /* GTID_SUPPORT */ extern my_bool wsrep_preordered_opt; #ifdef HAVE_PSI_INTERFACE @@ -337,6 +335,8 @@ extern PSI_mutex_key key_LOCK_wsrep_replaying; extern PSI_cond_key key_COND_wsrep_replaying; extern PSI_mutex_key key_LOCK_wsrep_slave_threads; extern PSI_cond_key key_COND_wsrep_slave_threads; +extern PSI_mutex_key key_LOCK_wsrep_gtid_wait_upto; +extern PSI_cond_key key_COND_wsrep_gtid_wait_upto; extern PSI_mutex_key key_LOCK_wsrep_cluster_config; extern PSI_mutex_key key_LOCK_wsrep_desync; extern PSI_mutex_key key_LOCK_wsrep_SR_pool; @@ -387,7 +387,122 @@ class Log_event; int wsrep_ignored_error_code(Log_event* ev, int error); int wsrep_must_ignore_error(THD* thd); -bool wsrep_replicate_GTID(THD* thd); +struct wsrep_server_gtid_t +{ + uint32 domain_id; + uint32 server_id; + uint64 seqno; +}; +class Wsrep_gtid_server +{ +public: + uint32 domain_id; + uint32 server_id; + Wsrep_gtid_server() + : m_force_signal(false) + , m_seqno(0) + , m_committed_seqno(0) + { } + void gtid(const wsrep_server_gtid_t& gtid) + { + domain_id= gtid.domain_id; + server_id= gtid.server_id; + m_seqno= gtid.seqno; + } + wsrep_server_gtid_t gtid() + { + wsrep_server_gtid_t gtid; + gtid.domain_id= domain_id; + gtid.server_id= server_id; + gtid.seqno= m_seqno; + return gtid; + } + void seqno(const uint64 seqno) { m_seqno= seqno; } + uint64 seqno() const { return m_seqno; } + uint64 seqno_committed() const { return m_committed_seqno; } + uint64 seqno_inc() + { + m_seqno++; + return m_seqno; + } + const wsrep_server_gtid_t& undefined() + { + return m_undefined; + } + int wait_gtid_upto(const uint64_t seqno, uint timeout) + { + int wait_result; + struct timespec wait_time; + int ret= 0; + mysql_cond_t wait_cond; + mysql_cond_init(key_COND_wsrep_gtid_wait_upto, &wait_cond, NULL); + set_timespec(wait_time, timeout); + mysql_mutex_lock(&LOCK_wsrep_gtid_wait_upto); + std::multimap<uint64, mysql_cond_t*>::iterator it; + try + { + it= m_wait_map.insert(std::make_pair(seqno, &wait_cond)); + } + catch (std::bad_alloc& e) + { + return 0; + } + while ((m_committed_seqno < seqno) && !m_force_signal) + { + wait_result= mysql_cond_timedwait(&wait_cond, + &LOCK_wsrep_gtid_wait_upto, + &wait_time); + if (wait_result == ETIMEDOUT || wait_result == ETIME) + { + ret= 1; + break; + } + } + m_wait_map.erase(it); + mysql_mutex_unlock(&LOCK_wsrep_gtid_wait_upto); + mysql_cond_destroy(&wait_cond); + return ret; + } + void signal_waiters(uint64 seqno, bool signal_all) + { + if (!signal_all && (m_committed_seqno >= seqno)) + { + return; + } + mysql_mutex_lock(&LOCK_wsrep_gtid_wait_upto); + m_force_signal= true; + std::multimap<uint64, mysql_cond_t*>::iterator it_end; + std::multimap<uint64, mysql_cond_t*>::iterator it_begin; + if (signal_all) + { + it_end= m_wait_map.end(); + } + else + { + it_end= m_wait_map.upper_bound(seqno); + } + for (it_begin = m_wait_map.begin(); it_begin != it_end; ++it_begin) + { + mysql_cond_signal(it_begin->second); + } + m_force_signal= false; + mysql_mutex_unlock(&LOCK_wsrep_gtid_wait_upto); + if (m_committed_seqno < seqno) + { + m_committed_seqno= seqno; + } + } +private: + const wsrep_server_gtid_t m_undefined= {0,0,0}; + std::multimap<uint64, mysql_cond_t*> m_wait_map; + bool m_force_signal; + Atomic_counter<uint64_t> m_seqno; + Atomic_counter<uint64_t> m_committed_seqno; +}; +extern Wsrep_gtid_server wsrep_gtid_server; +void wsrep_init_gtid(); +bool wsrep_check_gtid_seqno(const uint32&, const uint32&, uint64&); +bool wsrep_get_binlog_gtid_seqno(wsrep_server_gtid_t&); typedef struct wsrep_key_arr { diff --git a/sql/wsrep_server_service.cc b/sql/wsrep_server_service.cc index d0a9b54ac1b..6d737463f07 100644 --- a/sql/wsrep_server_service.cc +++ b/sql/wsrep_server_service.cc @@ -153,7 +153,7 @@ void Wsrep_server_service::bootstrap() wsrep::log_info() << "Bootstrapping a new cluster, setting initial position to " << wsrep::gtid::undefined(); - wsrep_set_SE_checkpoint(wsrep::gtid::undefined()); + wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined()); } void Wsrep_server_service::log_message(enum wsrep::log::level level, @@ -212,7 +212,7 @@ void Wsrep_server_service::log_view( if (prev_view.state_id().id() != view.state_id().id()) { WSREP_DEBUG("New cluster UUID was generated, resetting position info"); - wsrep_set_SE_checkpoint(wsrep::gtid::undefined()); + wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined()); checkpoint_was_reset= true; } @@ -263,9 +263,9 @@ void Wsrep_server_service::log_view( Wsrep_server_state::instance().provider().last_committed_gtid().seqno(); if (checkpoint_was_reset || last_committed != view.state_id().seqno()) { - wsrep_set_SE_checkpoint(view.state_id()); + wsrep_set_SE_checkpoint(view.state_id(), wsrep_gtid_server.gtid()); } - DBUG_ASSERT(wsrep_get_SE_checkpoint().id() == view.state_id().id()); + DBUG_ASSERT(wsrep_get_SE_checkpoint<wsrep::gtid>().id() == view.state_id().id()); } else { @@ -299,13 +299,13 @@ wsrep::view Wsrep_server_service::get_view(wsrep::client_service& c, wsrep::gtid Wsrep_server_service::get_position(wsrep::client_service&) { - return wsrep_get_SE_checkpoint(); + return wsrep_get_SE_checkpoint<wsrep::gtid>(); } void Wsrep_server_service::set_position(wsrep::client_service&, const wsrep::gtid& gtid) { - wsrep_set_SE_checkpoint(gtid); + wsrep_set_SE_checkpoint(gtid, wsrep_gtid_server.gtid()); } void Wsrep_server_service::log_state_change( diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index 02f7d4b6760..d478ea486cd 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -351,8 +351,8 @@ void wsrep_sst_received (THD* thd, wsrep::seqno(seqno)); if (!wsrep_before_SE()) { - wsrep_set_SE_checkpoint(wsrep::gtid::undefined()); - wsrep_set_SE_checkpoint(sst_gtid); + wsrep_set_SE_checkpoint(wsrep::gtid::undefined(), wsrep_gtid_server.undefined()); + wsrep_set_SE_checkpoint(sst_gtid, wsrep_gtid_server.gtid()); } wsrep_verify_SE_checkpoint(uuid, seqno); @@ -594,7 +594,7 @@ static void* sst_joiner_thread (void* a) err= EINVAL; goto err; } else { - wsrep_gtid_domain_id= (uint32) domain_id; + wsrep_gtid_server.domain_id= (uint32) domain_id; } } } @@ -1365,13 +1365,15 @@ static int sst_donate_mysqldump (const char* addr, WSREP_SST_OPT_LPORT " '%u' " WSREP_SST_OPT_SOCKET " '%s' " "%s" - WSREP_SST_OPT_GTID " '%s:%lld' " + WSREP_SST_OPT_GTID " '%s:%lld,%d-%d-%llu' " WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'" "%s", addr, port, mysqld_port, mysqld_unix_port, wsrep_defaults_file, uuid_oss.str().c_str(), gtid.seqno().get(), - wsrep_gtid_domain_id, + wsrep_gtid_server.domain_id, wsrep_gtid_server.server_id, + wsrep_gtid_server.seqno(), + wsrep_gtid_server.domain_id, bypass ? " " WSREP_SST_OPT_BYPASS : ""); if (ret < 0 || size_t(ret) >= cmd_len) @@ -1538,7 +1540,7 @@ static int sst_flush_tables(THD* thd) */ char content[100]; snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid, - (long long)wsrep_locked_seqno, wsrep_gtid_domain_id); + (long long)wsrep_locked_seqno, wsrep_gtid_server.domain_id); err= sst_create_file(flush_success, content); const char base_name[]= "tables_flushed"; @@ -1563,7 +1565,7 @@ static int sst_flush_tables(THD* thd) fprintf(file, "%s:%lld %u\n", uuid_oss.str().c_str(), server_state.pause_seqno().get(), - wsrep_gtid_domain_id); + wsrep_gtid_server.domain_id); fsync(fileno(file)); fclose(file); if (rename(tmp_name, real_name) == -1) @@ -1783,7 +1785,7 @@ static int sst_donate_other (const char* method, "%s", method, addr, mysqld_unix_port, mysql_real_data_home, wsrep_defaults_file, - uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_domain_id, + uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_server.domain_id, binlog_opt_val, binlog_index_opt_val, bypass ? " " WSREP_SST_OPT_BYPASS : ""); diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index b8ce7eb42d0..006d412e6aa 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -233,7 +233,8 @@ static inline int wsrep_before_prepare(THD* thd, bool all) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); wsrep_xid_init(&thd->wsrep_xid, - thd->wsrep_trx().ws_meta().gtid()); + thd->wsrep_trx().ws_meta().gtid(), + wsrep_gtid_server.gtid()); } DBUG_RETURN(ret); } @@ -273,8 +274,33 @@ static inline int wsrep_before_commit(THD* thd, bool all) if ((ret= thd->wsrep_cs().before_commit()) == 0) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); + if (!thd->variables.gtid_seq_no && + (thd->wsrep_trx().ws_meta().flags() & wsrep::provider::flag::commit)) + { + uint64 seqno= 0; + if (thd->variables.wsrep_gtid_seq_no && + thd->variables.wsrep_gtid_seq_no > wsrep_gtid_server.seqno()) + { + seqno= thd->variables.wsrep_gtid_seq_no; + wsrep_gtid_server.seqno(thd->variables.wsrep_gtid_seq_no); + } + else + { + seqno= wsrep_gtid_server.seqno_inc(); + } + thd->variables.wsrep_gtid_seq_no= 0; + thd->wsrep_current_gtid_seqno= seqno; + if (mysql_bin_log.is_open() && wsrep_gtid_mode) + { + thd->variables.gtid_seq_no= seqno; + thd->variables.gtid_domain_id= wsrep_gtid_server.domain_id; + thd->variables.server_id= wsrep_gtid_server.server_id; + } + } + wsrep_xid_init(&thd->wsrep_xid, - thd->wsrep_trx().ws_meta().gtid()); + thd->wsrep_trx().ws_meta().gtid(), + wsrep_gtid_server.gtid()); wsrep_register_for_group_commit(thd); } DBUG_RETURN(ret); diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index 5f76f650b34..daf12f94fb9 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -37,7 +37,10 @@ int wsrep_init_vars() wsrep_node_name = my_strdup("", MYF(MY_WME)); wsrep_node_address = my_strdup("", MYF(MY_WME)); wsrep_node_incoming_address= my_strdup(WSREP_NODE_INCOMING_AUTO, MYF(MY_WME)); - wsrep_start_position = my_strdup(WSREP_START_POSITION_ZERO, MYF(MY_WME)); + if (wsrep_gtid_mode) + wsrep_start_position = my_strdup(WSREP_START_POSITION_ZERO_GTID, MYF(MY_WME)); + else + wsrep_start_position = my_strdup(WSREP_START_POSITION_ZERO, MYF(MY_WME)); return 0; } @@ -103,6 +106,13 @@ bool wsrep_sync_wait_update (sys_var* self, THD* thd, enum_var_type var_type) return false; } +template<typename T> +static T parse_value(char** startptr, char** endptr) +{ + T val= strtoll(*startptr, *&endptr, 10); + *startptr= *endptr; + return val; +} /* Verify the format of the given UUID:seqno. @@ -136,8 +146,25 @@ bool wsrep_start_position_verify (const char* start_str) return true; char* endptr; + char* startptr= (char *)start_str + uuid_len + 1; wsrep_seqno_t const seqno __attribute__((unused)) // to avoid GCC warnings - (strtoll(&start_str[uuid_len + 1], &endptr, 10)); + (parse_value<uint64_t>(&startptr, &endptr)); + + // Start parsing native GTID part + if (*startptr == ',') + { + startptr++; + uint32_t domain __attribute__((unused)) + (parse_value<uint32_t>(&startptr, &endptr)); + if (*endptr != '-') return true; + startptr++; + uint32_t server __attribute__((unused)) + (parse_value<uint32_t>(&startptr, &endptr)); + if (*endptr != '-') return true; + startptr++; + uint64_t seq __attribute__((unused)) + (parse_value<uint64_t>(&startptr, &endptr)); + } // Remaining string was seqno. if (*endptr == '\0') return false; @@ -150,9 +177,22 @@ static bool wsrep_set_local_position(THD* thd, const char* const value, size_t length, bool const sst) { + char* endptr; + char* startptr; wsrep_uuid_t uuid; size_t const uuid_len= wsrep_uuid_scan(value, length, &uuid); - wsrep_seqno_t const seqno= strtoll(value + uuid_len + 1, NULL, 10); + startptr= (char *)value + uuid_len + 1; + wsrep_seqno_t const seqno= parse_value<uint64_t>(&startptr, &endptr); + + if (*startptr == ',') + { + startptr++; + wsrep_gtid_server.domain_id= parse_value<uint32_t>(&startptr, &endptr); + startptr++; + wsrep_gtid_server.server_id= parse_value<uint32_t>(&startptr, &endptr); + startptr++; + wsrep_gtid_server.seqno(parse_value<uint64_t>(&startptr, &endptr)); + } if (sst) { wsrep_sst_received (thd, uuid, seqno, NULL, 0); @@ -436,6 +476,15 @@ bool wsrep_debug_update(sys_var *self, THD* thd, enum_var_type type) return false; } +bool +wsrep_gtid_seq_no_check(sys_var *self, THD *thd, set_var *var) +{ + ulonglong new_wsrep_gtid_seq_no= var->save_result.ulonglong_value; + if (wsrep_gtid_mode && new_wsrep_gtid_seq_no > wsrep_gtid_server.seqno()) + return false; + return true; +} + static int wsrep_cluster_address_verify (const char* cluster_address_str) { /* There is no predefined address format, it depends on provider. */ diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h index 481df02f2d5..810ed4f3dd7 100644 --- a/sql/wsrep_var.h +++ b/sql/wsrep_var.h @@ -20,9 +20,10 @@ #ifdef WITH_WSREP -#define WSREP_CLUSTER_NAME "my_wsrep_cluster" -#define WSREP_NODE_INCOMING_AUTO "AUTO" -#define WSREP_START_POSITION_ZERO "00000000-0000-0000-0000-000000000000:-1" +#define WSREP_CLUSTER_NAME "my_wsrep_cluster" +#define WSREP_NODE_INCOMING_AUTO "AUTO" +#define WSREP_START_POSITION_ZERO "00000000-0000-0000-0000-000000000000:-1" +#define WSREP_START_POSITION_ZERO_GTID "00000000-0000-0000-0000-000000000000:-1,0-0-0" // MySQL variables funcs @@ -102,6 +103,8 @@ extern bool wsrep_reject_queries_update UPDATE_ARGS; extern bool wsrep_debug_update UPDATE_ARGS; +extern bool wsrep_gtid_seq_no_check CHECK_ARGS; + #else /* WITH_WSREP */ #define wsrep_provider_init(X) diff --git a/sql/wsrep_xid.cc b/sql/wsrep_xid.cc index d8f6e013820..c84071e13d0 100644 --- a/sql/wsrep_xid.cc +++ b/sql/wsrep_xid.cc @@ -33,20 +33,24 @@ #define WSREP_XID_VERSION_OFFSET WSREP_XID_PREFIX_LEN #define WSREP_XID_VERSION_1 'd' #define WSREP_XID_VERSION_2 'e' +#define WSREP_XID_VERSION_3 'f' #define WSREP_XID_UUID_OFFSET 8 #define WSREP_XID_SEQNO_OFFSET (WSREP_XID_UUID_OFFSET + sizeof(wsrep_uuid_t)) -#define WSREP_XID_GTRID_LEN (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) +#define WSREP_XID_GTRID_LEN_V_1_2 (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) +#define WSREP_XID_RPL_GTID_OFFSET (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) +#define WSREP_XID_GTRID_LEN_V_3 (WSREP_XID_RPL_GTID_OFFSET + sizeof(wsrep_server_gtid_t)) -void wsrep_xid_init(XID* xid, const wsrep::gtid& wsgtid) +void wsrep_xid_init(XID* xid, const wsrep::gtid& wsgtid, const wsrep_server_gtid_t& gtid) { xid->formatID= 1; - xid->gtrid_length= WSREP_XID_GTRID_LEN; + xid->gtrid_length= WSREP_XID_GTRID_LEN_V_3; xid->bqual_length= 0; memset(xid->data, 0, sizeof(xid->data)); memcpy(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN); - xid->data[WSREP_XID_VERSION_OFFSET]= WSREP_XID_VERSION_2; + xid->data[WSREP_XID_VERSION_OFFSET]= WSREP_XID_VERSION_3; memcpy(xid->data + WSREP_XID_UUID_OFFSET, wsgtid.id().data(),sizeof(wsrep::id)); int8store(xid->data + WSREP_XID_SEQNO_OFFSET, wsgtid.seqno().get()); + memcpy(xid->data + WSREP_XID_RPL_GTID_OFFSET, >id, sizeof(wsrep_server_gtid_t)); } extern "C" @@ -54,11 +58,13 @@ int wsrep_is_wsrep_xid(const void* xid_ptr) { const XID* xid= static_cast<const XID*>(xid_ptr); return (xid->formatID == 1 && - xid->gtrid_length == WSREP_XID_GTRID_LEN && xid->bqual_length == 0 && - !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN) && - (xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_1 || - xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_2)); + !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN) && + (((xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_1 || + xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_2) && + xid->gtrid_length == WSREP_XID_GTRID_LEN_V_1_2) || + (xid->data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_3 && + xid->gtrid_length == WSREP_XID_GTRID_LEN_V_3))); } const unsigned char* wsrep_xid_uuid(const xid_t* xid) @@ -90,6 +96,7 @@ long long wsrep_xid_seqno(const xid_t* xid) memcpy(&ret, xid->data + WSREP_XID_SEQNO_OFFSET, sizeof ret); break; case WSREP_XID_VERSION_2: + case WSREP_XID_VERSION_3: ret= sint8korr(xid->data + WSREP_XID_SEQNO_OFFSET); break; default: @@ -127,10 +134,10 @@ bool wsrep_set_SE_checkpoint(XID& xid) &xid); } -bool wsrep_set_SE_checkpoint(const wsrep::gtid& wsgtid) +bool wsrep_set_SE_checkpoint(const wsrep::gtid& wsgtid, const wsrep_server_gtid_t& gtid) { XID xid; - wsrep_xid_init(&xid, wsgtid); + wsrep_xid_init(&xid, wsgtid, gtid); return wsrep_set_SE_checkpoint(xid); } @@ -158,30 +165,61 @@ bool wsrep_get_SE_checkpoint(XID& xid) &xid); } -wsrep::gtid wsrep_get_SE_checkpoint() +static bool wsrep_get_SE_checkpoint_common(XID& xid) { - XID xid; xid.null(); if (wsrep_get_SE_checkpoint(xid)) { - return wsrep::gtid(); + return FALSE; } if (xid.is_null()) { - return wsrep::gtid(); + return FALSE; } if (!wsrep_is_wsrep_xid(&xid)) { WSREP_WARN("Read non-wsrep XID from storage engines."); + return FALSE; + } + + return TRUE; +} + +template<> +wsrep::gtid wsrep_get_SE_checkpoint() +{ + XID xid; + + if (!wsrep_get_SE_checkpoint_common(xid)) + { return wsrep::gtid(); } return wsrep::gtid(wsrep_xid_uuid(xid),wsrep_xid_seqno(xid)); } +template<> +wsrep_server_gtid_t wsrep_get_SE_checkpoint() +{ + XID xid; + wsrep_server_gtid_t gtid= {0,0,0}; + + if (!wsrep_get_SE_checkpoint_common(xid)) + { + return gtid; + } + + if (xid.data[WSREP_XID_VERSION_OFFSET] == WSREP_XID_VERSION_3) + { + memcpy(>id, &xid.data[WSREP_XID_RPL_GTID_OFFSET], sizeof(wsrep_server_gtid_t)); + } + + return gtid; +} + /* Sort order for XIDs. Wsrep XIDs are sorted according to seqno in ascending order. Non-wsrep XIDs are considered diff --git a/sql/wsrep_xid.h b/sql/wsrep_xid.h index a1b9afc1817..45ba6ffee6b 100644 --- a/sql/wsrep_xid.h +++ b/sql/wsrep_xid.h @@ -20,15 +20,16 @@ #ifdef WITH_WSREP +#include "wsrep_mysqld.h" #include "wsrep/gtid.hpp" #include "handler.h" // XID typedef -void wsrep_xid_init(xid_t*, const wsrep::gtid&); +void wsrep_xid_init(xid_t*, const wsrep::gtid&, const wsrep_server_gtid_t&); const wsrep::id& wsrep_xid_uuid(const XID&); wsrep::seqno wsrep_xid_seqno(const XID&); -wsrep::gtid wsrep_get_SE_checkpoint(); -bool wsrep_set_SE_checkpoint(const wsrep::gtid& gtid); +template<typename T> T wsrep_get_SE_checkpoint(); +bool wsrep_set_SE_checkpoint(const wsrep::gtid& gtid, const wsrep_server_gtid_t&); //void wsrep_get_SE_checkpoint(XID&); /* uncomment if needed */ //void wsrep_set_SE_checkpoint(XID&); /* uncomment if needed */ |