diff options
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r-- | sql/wsrep_mysqld.cc | 2438 |
1 files changed, 1213 insertions, 1225 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 58b30a1e77f..73f201f12ca 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -14,7 +14,12 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ #include "sql_plugin.h" /* wsrep_plugins_pre_init() */ +#include "my_global.h" +#include "wsrep_server_state.h" + +#include "mariadb.h" #include <mysqld.h> +#include <transaction.h> #include <sql_class.h> #include <sql_parse.h> #include <sql_base.h> /* find_temporary_table() */ @@ -33,26 +38,32 @@ #include "wsrep_var.h" #include "wsrep_binlog.h" #include "wsrep_applier.h" +#include "wsrep_schema.h" #include "wsrep_xid.h" +#include "wsrep_trans_observer.h" +#include "mysql/service_wsrep.h" #include <cstdio> #include <cstdlib> +#include <string> #include "log_event.h" #include <slave.h> -wsrep_t *wsrep = NULL; -/* - wsrep_emulate_bin_log is a flag to tell that binlog has not been configured. - wsrep needs to get binlog events from transaction cache even when binlog is - not enabled, wsrep_emulate_bin_log opens needed code paths to make this - possible -*/ -my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface +#include <sstream> + +/* wsrep-lib */ +Wsrep_server_state* Wsrep_server_state::m_instance; + +my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface #ifdef GTID_SUPPORT /* Sidno in global_sid_map corresponding to group uuid */ rpl_sidno wsrep_sidno= -1; #endif /* GTID_SUPPORT */ my_bool wsrep_preordered_opt= FALSE; +/* Streaming Replication */ +const char *wsrep_fragment_units[]= { "bytes", "rows", "statements", NullS }; +const char *wsrep_SR_store_types[]= { "none", "table", NullS }; + /* * Begin configuration options */ @@ -82,7 +93,7 @@ my_bool wsrep_certify_nonPK; // Certify, even when no primary my_bool wsrep_recovery; // Recovery my_bool wsrep_replicate_myisam; // Enable MyISAM replication my_bool wsrep_log_conflicts; -my_bool wsrep_load_data_splitting; // Commit load data every 10K intervals +my_bool wsrep_load_data_splitting= 0; // Commit load data every 10K intervals my_bool wsrep_slave_UK_checks; // Slave thread does UK checks my_bool wsrep_slave_FK_checks; // Slave thread does FK checks my_bool wsrep_restart_slave; // Should mysql slave thread be @@ -107,7 +118,13 @@ my_bool wsrep_restart_slave_activated= 0; // Node has dropped, and slave bool wsrep_new_cluster= false; // Bootstrap the cluster? int wsrep_slave_count_change= 0; // No. of appliers to stop/start int wsrep_to_isolation= 0; // No. of active TO isolation threads -long wsrep_max_protocol_version= 3; // Maximum protocol version to use +long wsrep_max_protocol_version= 4; // Maximum protocol version to use +long int wsrep_protocol_version= wsrep_max_protocol_version; +ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES; + // unit for fragment size +ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE; +uint wsrep_ignore_apply_errors= 0; + /* * End configuration options @@ -123,29 +140,33 @@ mysql_mutex_t LOCK_wsrep_sst; mysql_cond_t COND_wsrep_sst; mysql_mutex_t LOCK_wsrep_sst_init; mysql_cond_t COND_wsrep_sst_init; -mysql_mutex_t LOCK_wsrep_rollback; -mysql_cond_t COND_wsrep_rollback; -wsrep_aborting_thd_t wsrep_aborting_thd= NULL; mysql_mutex_t LOCK_wsrep_replaying; mysql_cond_t COND_wsrep_replaying; mysql_mutex_t LOCK_wsrep_slave_threads; mysql_mutex_t LOCK_wsrep_desync; mysql_mutex_t LOCK_wsrep_config_state; +mysql_mutex_t LOCK_wsrep_SR_pool; +mysql_mutex_t LOCK_wsrep_SR_store; int wsrep_replaying= 0; -ulong wsrep_running_threads = 0; // # of currently running wsrep threads +ulong wsrep_running_threads= 0; // # of currently running wsrep threads ulong my_bind_addr; #ifdef HAVE_PSI_INTERFACE -PSI_mutex_key key_LOCK_wsrep_rollback, +PSI_mutex_key key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst, key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init, key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync, - key_LOCK_wsrep_config_state; + key_LOCK_wsrep_config_state, + key_LOCK_wsrep_SR_pool, + key_LOCK_wsrep_SR_store, + key_LOCK_wsrep_thd_queue; -PSI_cond_key key_COND_wsrep_rollback, +PSI_cond_key key_COND_wsrep_thd, key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, - key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread; + key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread, + key_COND_wsrep_thd_queue; + PSI_file_key key_file_wsrep_gra_log; @@ -156,11 +177,12 @@ static PSI_mutex_info wsrep_mutexes[]= { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0}, { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL}, - { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL}, - { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL} + { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL}, + { &key_LOCK_wsrep_SR_store, "LOCK_wsrep_SR_store", PSI_FLAG_GLOBAL} }; static PSI_cond_info wsrep_conds[]= @@ -169,7 +191,7 @@ static PSI_cond_info wsrep_conds[]= { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0}, - { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL}, + { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0}, { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL} }; @@ -179,310 +201,219 @@ static PSI_file_info wsrep_files[]= }; #endif -my_bool wsrep_inited = 0; // initialized ? +my_bool wsrep_inited= 0; // initialized ? -static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; +static wsrep_uuid_t node_uuid= WSREP_UUID_UNDEFINED; +static wsrep_uuid_t cluster_uuid= WSREP_UUID_UNDEFINED; static char cluster_uuid_str[40]= { 0, }; -static const char* cluster_status_str[WSREP_VIEW_MAX] = -{ - "Primary", - "non-Primary", - "Disconnected" -}; static char provider_name[256]= { 0, }; static char provider_version[256]= { 0, }; static char provider_vendor[256]= { 0, }; /* - * wsrep status variables + * Wsrep status variables. LOCK_status must be locked When modifying + * these variables, */ -my_bool wsrep_connected = FALSE; -my_bool wsrep_ready = FALSE; // node can accept queries -const char* wsrep_cluster_state_uuid = cluster_uuid_str; -long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED; -const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED]; -long wsrep_cluster_size = 0; -long wsrep_local_index = -1; -long long wsrep_local_bf_aborts = 0; -const char* wsrep_provider_name = provider_name; -const char* wsrep_provider_version = provider_version; -const char* wsrep_provider_vendor = provider_vendor; +my_bool wsrep_connected = FALSE; +my_bool wsrep_ready = FALSE; +const char* wsrep_cluster_state_uuid= cluster_uuid_str; +long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED; +const char* wsrep_cluster_status = "Disconnected"; +long wsrep_cluster_size = 0; +long wsrep_local_index = -1; +long long wsrep_local_bf_aborts = 0; +const char* wsrep_provider_name = provider_name; +const char* wsrep_provider_version = provider_version; +const char* wsrep_provider_vendor = provider_vendor; +char* wsrep_provider_capabilities = NULL; +char* wsrep_cluster_capabilities = NULL; /* End wsrep status variables */ -wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; -wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; -long wsrep_protocol_version = 3; - wsp::Config_state *wsrep_config_state; -// Boolean denoting if server is in initial startup phase. This is needed -// to make sure that main thread waiting in wsrep_sst_wait() is signaled -// if there was no state gap on receiving first view event. -static my_bool wsrep_startup = TRUE; +wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; +wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; +wsp::node_status local_status; -static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { - switch (level) { - case WSREP_LOG_INFO: - sql_print_information("WSREP: %s", msg); - break; - case WSREP_LOG_WARN: - sql_print_warning("WSREP: %s", msg); - break; - case WSREP_LOG_ERROR: - case WSREP_LOG_FATAL: +/* + */ +Wsrep_schema *wsrep_schema= 0; + +static void wsrep_log_cb(wsrep::log::level level, const char *msg) +{ + /* + Silence all wsrep related logging from lib and provider if + wsrep is not enabled. + */ + if (WSREP_ON) + { + switch (level) { + case wsrep::log::info: + sql_print_information("WSREP: %s", msg); + break; + case wsrep::log::warning: + sql_print_warning("WSREP: %s", msg); + break; + case wsrep::log::error: sql_print_error("WSREP: %s", msg); break; - case WSREP_LOG_DEBUG: - if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg); - default: - break; + case wsrep::log::debug: + if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg); + default: + break; + } } } -void wsrep_log(void (*fun)(const char *, ...), const char *format, ...) +void wsrep_init_sidno(const wsrep::id& uuid) { - va_list args; - char msg[1024]; - va_start(args, format); - vsnprintf(msg, sizeof(msg) - 1, format, args); - va_end(args); - (fun)("WSREP: %s", msg); -} - - -static void wsrep_log_states (wsrep_log_level_t const level, - const wsrep_uuid_t* const group_uuid, - wsrep_seqno_t const group_seqno, - const wsrep_uuid_t* const node_uuid, - wsrep_seqno_t const node_seqno) -{ - char uuid_str[37]; - char msg[256]; - - wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str)); - snprintf (msg, 255, "WSREP: Group state: %s:%lld", - uuid_str, (long long)group_seqno); - wsrep_log_cb (level, msg); - - wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str)); - snprintf (msg, 255, "WSREP: Local state: %s:%lld", - uuid_str, (long long)node_seqno); - wsrep_log_cb (level, msg); -} - -#ifdef GTID_SUPPORT -void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid) -{ - /* generate new Sid map entry from inverted uuid */ - rpl_sid sid; - wsrep_uuid_t ltid_uuid; - - for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i) + /* + Protocol versions starting from 4 use group gtid as it is. + For lesser protocol versions generate new Sid map entry from inverted + uuid. + */ + rpl_gtid sid; + if (wsrep_protocol_version >= 4) { - ltid_uuid.data[i] = ~wsrep_uuid.data[i]; + memcpy((void*)&sid, (const uchar*)uuid.data(),16); } - - sid.copy_from(ltid_uuid.data); + else + { + wsrep_uuid_t ltid_uuid; + for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i) + { + ltid_uuid.data[i]= ~((const uchar*)uuid.data())[i]; + } + memcpy((void*)&sid, (const uchar*)ltid_uuid.data,16); + } +#ifdef GTID_SUPPORT global_sid_lock->wrlock(); wsrep_sidno= global_sid_map->add_sid(sid); WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno); global_sid_lock->unlock(); +#endif } -#endif /* GTID_SUPPORT */ -static wsrep_cb_status_t -wsrep_view_handler_cb (void* app_ctx, - void* recv_ctx, - const wsrep_view_info_t* view, - const char* state, - size_t state_len, - void** sst_req, - size_t* sst_req_len) +void wsrep_init_schema() { - *sst_req = NULL; - *sst_req_len = 0; - - wsrep_member_status_t memb_status= wsrep_config_state->get_status(); - - if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t))) - { - memcpy(&cluster_uuid, &view->state_id.uuid, sizeof(cluster_uuid)); - - wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, - sizeof(cluster_uuid_str)); - } - - wsrep_cluster_conf_id= view->view; - wsrep_cluster_status= cluster_status_str[view->status]; - wsrep_cluster_size= view->memb_num; - wsrep_local_index= view->my_idx; - - WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " - "number of nodes: %ld, my index: %ld, protocol version %d", - wsrep_cluster_state_uuid, (long long)view->state_id.seqno, - (long long)wsrep_cluster_conf_id, wsrep_cluster_status, - wsrep_cluster_size, wsrep_local_index, view->proto_ver); - - /* Proceed further only if view is PRIMARY */ - if (WSREP_VIEW_PRIMARY != view->status) - { -#ifdef HAVE_QUERY_CACHE - // query cache must be initialised by now - query_cache.flush(); -#endif /* HAVE_QUERY_CACHE */ - - wsrep_ready_set(FALSE); - memb_status= WSREP_MEMBER_UNDEFINED; - /* Always record local_uuid and local_seqno in non-prim since this - * may lead to re-initializing provider and start position is - * determined according to these variables */ - // WRONG! local_uuid should be the last primary configuration uuid we were - // a member of. local_seqno should be updated in commit calls. - // local_uuid= cluster_uuid; - // local_seqno= view->first - 1; - goto out; - } + DBUG_ASSERT(!wsrep_schema); - switch (view->proto_ver) + WSREP_INFO("wsrep_init_schema_and_SR %p", wsrep_schema); + if (!wsrep_schema) { - case 0: - case 1: - case 2: - case 3: - // version change - if (view->proto_ver != wsrep_protocol_version) - { - my_bool wsrep_ready_saved= wsrep_ready_get(); - wsrep_ready_set(FALSE); - WSREP_INFO("closing client connections for " - "protocol change %ld -> %d", - wsrep_protocol_version, view->proto_ver); - wsrep_close_client_connections(TRUE); - wsrep_protocol_version= view->proto_ver; - wsrep_ready_set(wsrep_ready_saved); - } - break; - default: - WSREP_ERROR("Unsupported application protocol version: %d", - view->proto_ver); - unireg_abort(1); - } - - if (view->state_gap) - { - WSREP_WARN("Gap in state sequence. Need state transfer."); - - /* After that wsrep will call wsrep_sst_prepare. */ - /* keep ready flag 0 until we receive the snapshot */ - wsrep_ready_set(FALSE); - - /* Close client connections to ensure that they don't interfere - * with SST. Necessary only if storage engines are initialized - * before SST. - * TODO: Just killing all ongoing transactions should be enough - * since wsrep_ready is OFF and no new transactions can start. - */ - if (!wsrep_before_SE()) + wsrep_schema= new Wsrep_schema(); + if (wsrep_schema->init()) { - WSREP_DEBUG("[debug]: closing client connections for PRIM"); - wsrep_close_client_connections(FALSE); + WSREP_ERROR("Failed to init wsrep schema"); + unireg_abort(1); } + } +} - ssize_t const req_len= wsrep_sst_prepare (sst_req); +void wsrep_deinit_schema() +{ + delete wsrep_schema; + wsrep_schema= 0; +} - if (req_len < 0) +void wsrep_recover_sr_from_storage(THD *orig_thd) +{ + switch (wsrep_SR_store_type) + { + case WSREP_SR_STORE_TABLE: + if (!wsrep_schema) { - WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len, - strerror(-req_len)); - memb_status= WSREP_MEMBER_UNDEFINED; + WSREP_ERROR("Wsrep schema not initialized when trying to recover " + "streaming transactions"); + unireg_abort(1); } - else + if (wsrep_schema->recover_sr_transactions(orig_thd)) { - assert(sst_req != NULL); - *sst_req_len= req_len; - memb_status= WSREP_MEMBER_JOINER; + WSREP_ERROR("Failed to recover SR transactions from schema"); + unireg_abort(1); } + break; + default: + /* */ + WSREP_ERROR("Unsupported wsrep SR store type: %lu", wsrep_SR_store_type); + unireg_abort(1); + break; } - else - { - /* - * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized - * before - OR - it was reinitilized on startup (lp:992840) - */ - if (wsrep_startup) +} + +/** Export the WSREP provider's capabilities as a human readable string. + * The result is saved in a dynamically allocated string of the form: + * :cap1:cap2:cap3: + */ +static void wsrep_capabilities_export(wsrep_cap_t const cap, char** str) +{ + static const char* names[] = + { + /* Keep in sync with wsrep/wsrep_api.h WSREP_CAP_* macros. */ + "MULTI_MASTER", + "CERTIFICATION", + "PARALLEL_APPLYING", + "TRX_REPLAY", + "ISOLATION", + "PAUSE", + "CAUSAL_READS", + "CAUSAL_TRX", + "INCREMENTAL_WRITESET", + "SESSION_LOCKS", + "DISTRIBUTED_LOCKS", + "CONSISTENCY_CHECK", + "UNORDERED", + "ANNOTATION", + "PREORDERED", + "STREAMING", + "SNAPSHOT", + "NBO", + }; + + std::string s; + for (size_t i= 0; i < sizeof(names) / sizeof(names[0]); ++i) + { + if (cap & (1ULL << i)) { - if (wsrep_before_SE()) + if (s.empty()) { - wsrep_SE_init_grab(); - // Signal mysqld init thread to continue - wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false); - // and wait for SE initialization - wsrep_SE_init_wait(); + s= ":"; } - else - { - local_uuid= cluster_uuid; - local_seqno= view->state_id.seqno; - } - /* Init storage engine XIDs from first view */ - wsrep_set_SE_checkpoint(local_uuid, local_seqno); -#ifdef GTID_SUPPORT - wsrep_init_sidno(local_uuid); -#endif /* GTID_SUPPORT */ - memb_status= WSREP_MEMBER_JOINED; - } - - // just some sanity check - if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) - { - WSREP_ERROR("Undetected state gap. Can't continue."); - wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno, - &local_uuid, -1); - unireg_abort(1); + s += names[i]; + s += ":"; } } - if (wsrep_auto_increment_control) - { - global_system_variables.auto_increment_offset= view->my_idx + 1; - global_system_variables.auto_increment_increment= view->memb_num; - } + /* A read from the string pointed to by *str may be started at any time, + * so it must never point to free(3)d memory or non '\0' terminated string. */ - { /* capabilities may be updated on new configuration */ - uint64_t const caps(wsrep->capabilities (wsrep)); - - my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0); - if (TRUE == wsrep_incremental_data_collection && FALSE == idc) - { - WSREP_WARN("Unsupported protocol downgrade: " - "incremental data collection disabled. Expect abort."); - } - wsrep_incremental_data_collection = idc; - } + char* const previous= *str; -out: - if (view->status == WSREP_VIEW_PRIMARY) wsrep_startup= FALSE; - wsrep_config_state->set(memb_status, view); + *str= strdup(s.c_str()); - return WSREP_CB_SUCCESS; + if (previous != NULL) + { + free(previous); + } } -my_bool wsrep_ready_set (my_bool x) +/* Verifies that SE position is consistent with the group position + * and initializes other variables */ +void wsrep_verify_SE_checkpoint(const wsrep_uuid_t& uuid, + wsrep_seqno_t const seqno) { - WSREP_DEBUG("Setting wsrep_ready to %d", x); - if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); - my_bool ret= (wsrep_ready != x); - if (ret) - { - wsrep_ready= x; - mysql_cond_signal (&COND_wsrep_ready); - } - mysql_mutex_unlock (&LOCK_wsrep_ready); - return ret; } +/* + Wsrep is considered ready if + 1) Provider is not loaded (native mode) + 2) Server has reached synced state + 3) Server is in joiner mode and mysqldump SST method has been + specified + See Wsrep_server_service::log_state_change() for further details. + */ my_bool wsrep_ready_get (void) { if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); @@ -499,178 +430,67 @@ int wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff) return 0; } -// Wait until wsrep has reached ready state -void wsrep_ready_wait () +void wsrep_update_cluster_state_uuid(const char* uuid) { - if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); - while (!wsrep_ready) - { - WSREP_INFO("Waiting to reach ready state"); - mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready); - } - WSREP_INFO("ready state reached"); - mysql_mutex_unlock (&LOCK_wsrep_ready); + strncpy(cluster_uuid_str, uuid, sizeof(cluster_uuid_str) - 1); } -static void wsrep_synced_cb(void* app_ctx) +static void wsrep_init_position() { - WSREP_INFO("Synchronized with group, ready for connections"); - my_bool signal_main= wsrep_ready_set(TRUE); - wsrep_config_state->set(WSREP_MEMBER_SYNCED); - - if (signal_main) - { - wsrep_SE_init_grab(); - // Signal mysqld init thread to continue - wsrep_sst_complete (&local_uuid, local_seqno, false); - // and wait for SE initialization - wsrep_SE_init_wait(); - } - if (wsrep_restart_slave_activated) - { - int rcode; - WSREP_INFO("MariaDB slave restart"); - wsrep_restart_slave_activated= FALSE; - - mysql_mutex_lock(&LOCK_active_mi); - if ((rcode = start_slave_threads(0, - 1 /* need mutex */, - 0 /* no wait for start*/, - active_mi, - master_info_file, - relay_log_info_file, - SLAVE_SQL))) - { - WSREP_WARN("Failed to create slave threads: %d", rcode); - } - mysql_mutex_unlock(&LOCK_active_mi); - - } } -static void wsrep_init_position() +/**************************************************************************** + Helpers for wsrep_init() + ****************************************************************************/ +static std::string wsrep_server_name() { - /* read XIDs from storage engines */ - wsrep_uuid_t uuid; - wsrep_seqno_t seqno; - wsrep_get_SE_checkpoint(uuid, seqno); - - if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t))) - { - WSREP_INFO("Read nil XID from storage engines, skipping position init"); - return; - } - - char uuid_str[40] = {0, }; - wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)); - WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno); - - if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) && - local_seqno == WSREP_SEQNO_UNDEFINED) - { - // Initial state - local_uuid= uuid; - local_seqno= seqno; - } - else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) || - local_seqno != seqno) - { - WSREP_WARN("Initial position was provided by configuration or SST, " - "avoiding override"); - } + std::string ret(wsrep_node_name ? wsrep_node_name : ""); + return ret; } -extern char* my_bind_addr_str; - -int wsrep_init() +static std::string wsrep_server_id() { - int rcode= -1; - DBUG_ASSERT(wsrep_inited == 0); - - if (strcmp(wsrep_start_position, WSREP_START_POSITION_ZERO) && - wsrep_start_position_init(wsrep_start_position)) - { - return 1; - } - - wsrep_sst_auth_init(); - - wsrep_ready_set(FALSE); - assert(wsrep_provider); - - wsrep_init_position(); - - if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK) - { - if (strcasecmp(wsrep_provider, WSREP_NONE)) - { - WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.", - wsrep_provider, strerror(rcode), rcode); - strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack - return wsrep_init(); - } - else /* this is for recursive call above */ - { - WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.", - strerror(rcode), rcode); - unireg_abort(1); - } - } + /* using empty server_id, which enables view change handler to + set final server_id later on + */ + std::string ret(""); + return ret; +} - if (!WSREP_PROVIDER_EXISTS) - { - // enable normal operation in case no provider is specified - wsrep_ready_set(TRUE); - wsrep_inited= 1; - global_system_variables.wsrep_on = 0; - wsrep_init_args args; - args.logger_cb = wsrep_log_cb; - args.options = (wsrep_provider_options) ? - wsrep_provider_options : ""; - rcode = wsrep->init(wsrep, &args); - if (rcode) - { - DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode)); - WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode); - wsrep->free(wsrep); - free(wsrep); - wsrep = NULL; - } - return rcode; - } - else - { - global_system_variables.wsrep_on = 1; - strncpy(provider_name, - wsrep->provider_name, sizeof(provider_name) - 1); - strncpy(provider_version, - wsrep->provider_version, sizeof(provider_version) - 1); - strncpy(provider_vendor, - wsrep->provider_vendor, sizeof(provider_vendor) - 1); - } +static std::string wsrep_server_node_address() +{ + std::string ret; if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0) - wsrep_data_home_dir = mysql_real_data_home; + wsrep_data_home_dir= mysql_real_data_home; /* Initialize node address */ - char node_addr[512]= { 0, }; - size_t const node_addr_max= sizeof(node_addr) - 1; if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) { - size_t const ret= wsrep_guess_ip(node_addr, node_addr_max); - if (!(ret > 0 && ret < node_addr_max)) + char node_addr[512]= {0, }; + const size_t node_addr_max= sizeof(node_addr) - 1; + size_t guess_ip_ret= wsrep_guess_ip(node_addr, node_addr_max); + if (!(guess_ip_ret > 0 && guess_ip_ret < node_addr_max)) { WSREP_WARN("Failed to guess base node address. Set it explicitly via " "wsrep_node_address."); - node_addr[0]= '\0'; + } + else + { + ret= node_addr; } } else { - strncpy(node_addr, wsrep_node_address, node_addr_max); + ret= wsrep_node_address; } + return ret; +} - /* Initialize node's incoming address */ +static std::string wsrep_server_incoming_address() +{ + std::string ret; + const std::string node_addr(wsrep_server_node_address()); char inc_addr[512]= { 0, }; size_t const inc_addr_max= sizeof (inc_addr); @@ -685,7 +505,8 @@ int wsrep_init() bool is_ipv6= false; unsigned int my_bind_ip= INADDR_ANY; // default if not set - if (my_bind_addr_str && strlen(my_bind_addr_str)) + if (my_bind_addr_str && strlen(my_bind_addr_str) && + strcmp(my_bind_addr_str, "*") != 0) { my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6); } @@ -704,22 +525,28 @@ int wsrep_init() } else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */ { - size_t const node_addr_len= strlen(node_addr); - if (node_addr_len > 0) + if (node_addr.size()) { - wsp::Address addr(node_addr); - - if (!addr.is_valid()) + size_t const ip_len= wsrep_host_len(node_addr.c_str(), node_addr.size()); + if (ip_len + 7 /* :55555\0 */ < inc_addr_max) { - WSREP_DEBUG("Could not parse node address : %s", node_addr); - WSREP_WARN("Guessing address for incoming client connections failed. " - "Try setting wsrep_node_incoming_address explicitly."); - goto done; + memcpy (inc_addr, node_addr.c_str(), ip_len); + snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u", + (int)mysqld_port); } + else + { + WSREP_WARN("Guessing address for incoming client connections: " + "address too long."); + inc_addr[0]= '\0'; + } + } - const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u"; - snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), - (int) mysqld_port); + if (!strlen(inc_addr)) + { + WSREP_WARN("Guessing address for incoming client connections failed. " + "Try setting wsrep_node_incoming_address explicitly."); + WSREP_INFO("Node addr: %s", node_addr.c_str()); } } } @@ -743,52 +570,178 @@ int wsrep_init() snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port); } + + done: + ret= wsrep_node_incoming_address; + return ret; +} -done: - struct wsrep_init_args wsrep_args; +static std::string wsrep_server_working_dir() +{ + std::string ret; + if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0) + { + ret= mysql_real_data_home; + } + else + { + ret= wsrep_data_home_dir; + } + return ret; +} - struct wsrep_gtid const state_id = { local_uuid, local_seqno }; +static wsrep::gtid wsrep_server_initial_position() +{ + wsrep::gtid ret; + WSREP_INFO("Server initial position: %s", wsrep_start_position); + std::istringstream is(wsrep_start_position); + is >> ret; + return ret; +} - wsrep_args.data_dir = wsrep_data_home_dir; - wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; - wsrep_args.node_address = node_addr; - wsrep_args.node_incoming = inc_addr; - wsrep_args.options = (wsrep_provider_options) ? - wsrep_provider_options : ""; - wsrep_args.proto_ver = wsrep_max_protocol_version; +/* + Intitialize provider specific status variables + */ +static void wsrep_init_provider_status_variables() +{ + const wsrep::provider& provider= + Wsrep_server_state::instance().provider(); + strncpy(provider_name, + provider.name().c_str(), sizeof(provider_name) - 1); + strncpy(provider_version, + provider.version().c_str(), sizeof(provider_version) - 1); + strncpy(provider_vendor, + provider.vendor().c_str(), sizeof(provider_vendor) - 1); +} + +int wsrep_init_server() +{ + wsrep::log::logger_fn(wsrep_log_cb); + try + { + std::string server_name; + std::string server_id; + std::string node_address; + std::string incoming_address; + std::string working_dir; + wsrep::gtid initial_position; + + server_name= wsrep_server_name(); + server_id= wsrep_server_id(); + node_address= wsrep_server_node_address(); + incoming_address= wsrep_server_incoming_address(); + working_dir= wsrep_server_working_dir(); + initial_position= wsrep_server_initial_position(); + + Wsrep_server_state::init_once(server_name, + incoming_address, + node_address, + working_dir, + initial_position, + wsrep_max_protocol_version); + } + catch (const wsrep::runtime_error& e) + { + WSREP_ERROR("Failed to init wsrep server %s", e.what()); + return 1; + } + catch (const std::exception& e) + { + WSREP_ERROR("Failed to init wsrep server %s", e.what()); + } + return 0; +} - wsrep_args.state_id = &state_id; +void wsrep_init_globals() +{ + wsrep_init_sidno(Wsrep_server_state::instance().connected_gtid().id()); + wsrep_init_schema(); + if (WSREP_ON) + { + Wsrep_server_state::instance().initialized(); + } +} - wsrep_args.logger_cb = wsrep_log_cb; - wsrep_args.view_handler_cb = wsrep_view_handler_cb; - wsrep_args.apply_cb = wsrep_apply_cb; - wsrep_args.commit_cb = wsrep_commit_cb; - wsrep_args.unordered_cb = wsrep_unordered_cb; - wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; - wsrep_args.synced_cb = wsrep_synced_cb; +void wsrep_deinit_server() +{ + wsrep_deinit_schema(); + Wsrep_server_state::destroy(); +} - rcode = wsrep->init(wsrep, &wsrep_args); +int wsrep_init() +{ + assert(wsrep_provider); - if (rcode) + wsrep_init_position(); + wsrep_sst_auth_init(); + + if (strlen(wsrep_provider)== 0 || + !strcmp(wsrep_provider, WSREP_NONE)) { - DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode)); - WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode); - wsrep->free(wsrep); - free(wsrep); - wsrep = NULL; - } else { - wsrep_inited= 1; + // enable normal operation in case no provider is specified + global_system_variables.wsrep_on= 0; + int err= Wsrep_server_state::instance().load_provider(wsrep_provider, wsrep_provider_options ? wsrep_provider_options : ""); + if (err) + { + DBUG_PRINT("wsrep",("wsrep::init() failed: %d", err)); + WSREP_ERROR("wsrep::init() failed: %d, must shutdown", err); + } + else + { + wsrep_init_provider_status_variables(); + } + return err; } - return rcode; -} + global_system_variables.wsrep_on= 1; + + if (wsrep_gtid_mode && opt_bin_log && !opt_log_slave_updates) + { + WSREP_ERROR("Option --log-slave-updates is required if " + "binlog is enabled, GTID mode is on and wsrep provider " + "is specified"); + return 1; + } + + if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0) + wsrep_data_home_dir= mysql_real_data_home; + + if (Wsrep_server_state::instance().load_provider(wsrep_provider, + wsrep_provider_options)) + { + WSREP_ERROR("Failed to load provider"); + return 1; + } + + if (!wsrep_provider_is_SR_capable() && + global_system_variables.wsrep_trx_fragment_size > 0) + { + WSREP_ERROR("The WSREP provider (%s) does not support streaming " + "replication but wsrep_trx_fragment_size is set to a " + "value other than 0 (%llu). Cannot continue. Either set " + "wsrep_trx_fragment_size to 0 or use wsrep_provider that " + "supports streaming replication.", + wsrep_provider, global_system_variables.wsrep_trx_fragment_size); + Wsrep_server_state::instance().unload_provider(); + return 1; + } + wsrep_inited= 1; + + wsrep_init_provider_status_variables(); + wsrep_capabilities_export(Wsrep_server_state::instance().provider().capabilities(), + &wsrep_provider_capabilities); + + WSREP_DEBUG("SR storage init for: %s", + (wsrep_SR_store_type == WSREP_SR_STORE_TABLE) ? "table" : "void"); + return 0; +} /* Initialize wsrep thread LOCKs and CONDs */ void wsrep_thr_init() { DBUG_ENTER("wsrep_thr_init"); - wsrep_config_state = new wsp::Config_state; + wsrep_config_state= new wsp::Config_state; #ifdef HAVE_PSI_INTERFACE mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes)); mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds)); @@ -801,25 +754,24 @@ void wsrep_thr_init() mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL); mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL); - mysql_mutex_init(key_LOCK_wsrep_rollback, &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL); mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_SR_pool, + &LOCK_wsrep_SR_pool, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_SR_store, + &LOCK_wsrep_SR_store, MY_MUTEX_INIT_FAST); DBUG_VOID_RETURN; } -void wsrep_init_startup (bool first) +void wsrep_init_startup (bool sst_first) { if (wsrep_init()) unireg_abort(1); - wsrep_thr_lock_init( - (wsrep_thd_is_brute_force_fun)wsrep_thd_is_BF, - (wsrep_abort_thd_fun)wsrep_abort_thd, - wsrep_debug, wsrep_convert_LOCK_to_trx, - (wsrep_on_fun)wsrep_on); + wsrep_thr_lock_init(wsrep_thd_is_BF, wsrep_thd_bf_abort, + wsrep_debug, wsrep_convert_LOCK_to_trx, wsrep_on); /* Pre-initialize global_system_variables.table_plugin with a dummy engine @@ -838,28 +790,54 @@ void wsrep_init_startup (bool first) /* Skip replication start if no cluster address */ if (!wsrep_cluster_address || wsrep_cluster_address[0] == 0) return; - if (first) wsrep_sst_grab(); // do it so we can wait for SST below - + /* + Read value of wsrep_new_cluster before wsrep_start_replication(), + the value is reset to FALSE inside wsrep_start_replication. + */ if (!wsrep_start_replication()) unireg_abort(1); wsrep_create_rollbacker(); wsrep_create_appliers(1); - if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed + Wsrep_server_state& server_state= Wsrep_server_state::instance(); + /* + If the SST happens before server initialization, wait until the server + state reaches initializing. This indicates that + either SST was not necessary or SST has been delivered. + + With mysqldump SST (!sst_first) wait until the server reaches + joiner state and procedd to accepting connections. + */ + if (sst_first) + { + server_state.wait_until_state(Wsrep_server_state::s_initializing); + } + else + { + server_state.wait_until_state(Wsrep_server_state::s_joiner); + } } void wsrep_deinit(bool free_options) { DBUG_ASSERT(wsrep_inited == 1); - wsrep_unload(wsrep); - wsrep= 0; + WSREP_DEBUG("wsrep_deinit"); + + Wsrep_server_state::instance().unload_provider(); provider_name[0]= '\0'; provider_version[0]= '\0'; provider_vendor[0]= '\0'; wsrep_inited= 0; + if (wsrep_provider_capabilities != NULL) + { + char* p= wsrep_provider_capabilities; + wsrep_provider_capabilities= NULL; + free(p); + } + if (free_options) { wsrep_sst_auth_free(); @@ -871,28 +849,37 @@ void wsrep_thr_deinit() { if (!wsrep_config_state) return; // Never initialized + WSREP_DEBUG("wsrep_thr_deinit"); mysql_mutex_destroy(&LOCK_wsrep_ready); mysql_cond_destroy(&COND_wsrep_ready); mysql_mutex_destroy(&LOCK_wsrep_sst); mysql_cond_destroy(&COND_wsrep_sst); mysql_mutex_destroy(&LOCK_wsrep_sst_init); mysql_cond_destroy(&COND_wsrep_sst_init); - mysql_mutex_destroy(&LOCK_wsrep_rollback); - mysql_cond_destroy(&COND_wsrep_rollback); mysql_mutex_destroy(&LOCK_wsrep_replaying); mysql_cond_destroy(&COND_wsrep_replaying); mysql_mutex_destroy(&LOCK_wsrep_slave_threads); mysql_mutex_destroy(&LOCK_wsrep_desync); mysql_mutex_destroy(&LOCK_wsrep_config_state); + mysql_mutex_destroy(&LOCK_wsrep_SR_pool); + mysql_mutex_destroy(&LOCK_wsrep_SR_store); + delete wsrep_config_state; wsrep_config_state= 0; // Safety + + if (wsrep_cluster_capabilities != NULL) + { + char* p= wsrep_cluster_capabilities; + wsrep_cluster_capabilities= NULL; + free(p); + } } void wsrep_recover() { char uuid_str[40]; - if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) && + if (wsrep_uuid_compare(&local_uuid, &WSREP_UUID_UNDEFINED) == 0 && local_seqno == -2) { wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str)); @@ -900,43 +887,60 @@ void wsrep_recover() uuid_str, (long long)local_seqno); return; } - wsrep_uuid_t uuid; - wsrep_seqno_t seqno; - wsrep_get_SE_checkpoint(uuid, seqno); - wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)); - WSREP_INFO("Recovered position: %s:%lld", uuid_str, (long long)seqno); + wsrep::gtid gtid= wsrep_get_SE_checkpoint(); + std::ostringstream oss; + oss << gtid; + WSREP_INFO("Recovered position: %s", oss.str().c_str()); } void wsrep_stop_replication(THD *thd) { WSREP_INFO("Stop replication"); - if (!wsrep) + if (Wsrep_server_state::instance().state() != + Wsrep_server_state::s_disconnected) { - WSREP_INFO("Provider was not loaded, in stop replication"); - return; + WSREP_DEBUG("Disconnect provider"); + Wsrep_server_state::instance().disconnect(); + Wsrep_server_state::instance().wait_until_state(Wsrep_server_state::s_disconnected); } - /* disconnect from group first to get wsrep_ready == FALSE */ - WSREP_DEBUG("Provider disconnect"); - wsrep->disconnect(wsrep); + /* my connection, should not terminate with wsrep_close_client_connection(), + make transaction to rollback + */ + if (thd && !thd->wsrep_applier) trans_rollback(thd); + wsrep_close_client_connections(TRUE, thd); + + /* wait until appliers have stopped */ + wsrep_wait_appliers_close(thd); + + node_uuid= WSREP_UUID_UNDEFINED; +} - wsrep_connected= FALSE; +void wsrep_shutdown_replication() +{ + WSREP_INFO("Shutdown replication"); + if (Wsrep_server_state::instance().state() != wsrep::server_state::s_disconnected) + { + WSREP_DEBUG("Disconnect provider"); + Wsrep_server_state::instance().disconnect(); + Wsrep_server_state::instance().wait_until_state(Wsrep_server_state::s_disconnected); + } wsrep_close_client_connections(TRUE); /* wait until appliers have stopped */ - wsrep_wait_appliers_close(thd); + wsrep_wait_appliers_close(NULL); + node_uuid= WSREP_UUID_UNDEFINED; - return; + /* Undocking the thread specific data. */ + my_pthread_setspecific_ptr(THR_THD, NULL); } bool wsrep_start_replication() { - wsrep_status_t rcode; - - /* wsrep provider must be loaded. */ - DBUG_ASSERT(wsrep); + int rcode; + WSREP_DEBUG("wsrep_start_replication"); /* if provider is trivial, don't even try to connect, @@ -945,34 +949,27 @@ bool wsrep_start_replication() if (!WSREP_PROVIDER_EXISTS) { // enable normal operation in case no provider is specified - wsrep_ready_set(TRUE); return true; } if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0) { // if provider is non-trivial, but no address is specified, wait for address - wsrep_ready_set(FALSE); return true; } - bool const bootstrap= wsrep_new_cluster; + bool const bootstrap(TRUE == wsrep_new_cluster); + wsrep_new_cluster= FALSE; WSREP_INFO("Start replication"); - if (wsrep_new_cluster) + if ((rcode= Wsrep_server_state::instance().connect( + wsrep_cluster_name, + wsrep_cluster_address, + wsrep_sst_donor, + bootstrap))) { - WSREP_INFO("'wsrep-new-cluster' option used, bootstrapping the cluster"); - wsrep_new_cluster= false; - } - - if ((rcode = wsrep->connect(wsrep, - wsrep_cluster_name, - wsrep_cluster_address, - wsrep_sst_donor, - bootstrap))) - { - DBUG_PRINT("wsrep",("wsrep->connect(%s) failed: %d", + DBUG_PRINT("wsrep",("wsrep_ptr->connect(%s) failed: %d", wsrep_cluster_address, rcode)); WSREP_ERROR("wsrep::connect(%s) failed: %d", wsrep_cluster_address, rcode); @@ -980,15 +977,12 @@ bool wsrep_start_replication() } else { - wsrep_connected= TRUE; - - char* opts= wsrep->options_get(wsrep); - if (opts) + try { - wsrep_provider_options_init(opts); - free(opts); + std::string opts= Wsrep_server_state::instance().provider().options(); + wsrep_provider_options_init(opts.c_str()); } - else + catch (const wsrep::runtime_error&) { WSREP_WARN("Failed to get wsrep options"); } @@ -999,40 +993,50 @@ bool wsrep_start_replication() bool wsrep_must_sync_wait (THD* thd, uint mask) { - return (thd->variables.wsrep_sync_wait & mask) && + bool ret; + mysql_mutex_lock(&thd->LOCK_thd_data); + ret= (thd->variables.wsrep_sync_wait & mask) && + thd->wsrep_client_thread && thd->variables.wsrep_on && !(thd->variables.wsrep_dirty_reads && !is_update_query(thd->lex->sql_command)) && !thd->in_active_multi_stmt_transaction() && - thd->wsrep_conflict_state != REPLAYING && - thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED; + thd->wsrep_trx().state() != + wsrep::transaction::s_replaying && + thd->wsrep_cs().sync_wait_gtid().is_undefined(); + mysql_mutex_unlock(&thd->LOCK_thd_data); + return ret; } bool wsrep_sync_wait (THD* thd, uint mask) { if (wsrep_must_sync_wait(thd, mask)) { - WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u", - thd->variables.wsrep_sync_wait, mask); - // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 - // TODO: modify to check if thd has locked any rows. - wsrep_status_t ret= wsrep->causal_read (wsrep, &thd->wsrep_sync_wait_gtid); - - if (unlikely(WSREP_OK != ret)) + WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait= %u, " + "mask= %u, thd->variables.wsrep_on= %d", + thd->variables.wsrep_sync_wait, mask, + thd->variables.wsrep_on); + /* + This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 + TODO: modify to check if thd has locked any rows. + */ + if (thd->wsrep_cs().sync_wait(-1)) { const char* msg; int err; - // Possibly relevant error codes: - // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY, - // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET, - // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED + /* + Possibly relevant error codes: + ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY, + ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET, + ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED + */ - switch (ret) + switch (thd->wsrep_cs().current_error()) { - case WSREP_NOT_IMPLEMENTED: + case wsrep::e_not_supported_error: msg= "synchronous reads by wsrep backend. " - "Please unset wsrep_causal_reads variable."; + "Please unset wsrep_causal_reads variable."; err= ER_NOT_SUPPORTED_YET; break; default: @@ -1050,6 +1054,27 @@ bool wsrep_sync_wait (THD* thd, uint mask) return false; } +enum wsrep::provider::status +wsrep_sync_wait_upto (THD* thd, + wsrep_gtid_t* upto, + int timeout) +{ + DBUG_ASSERT(upto); + enum wsrep::provider::status ret; + if (upto) + { + wsrep::gtid upto_gtid(wsrep::id(upto->uuid.data, sizeof(upto->uuid.data)), + wsrep::seqno(upto->seqno)); + ret= Wsrep_server_state::instance().wait_for_gtid(upto_gtid, timeout); + } + else + { + ret= Wsrep_server_state::instance().causal_read(timeout).second; + } + WSREP_DEBUG("wsrep_sync_wait_upto: %d", ret); + return ret; +} + void wsrep_keys_free(wsrep_key_arr_t* key_arr) { for (size_t i= 0; i < key_arr->keys_len; ++i) @@ -1061,7 +1086,6 @@ void wsrep_keys_free(wsrep_key_arr_t* key_arr) key_arr->keys_len= 0; } - /*! * @param db Database string * @param table Table string @@ -1073,9 +1097,9 @@ void wsrep_keys_free(wsrep_key_arr_t* key_arr) */ static bool wsrep_prepare_key_for_isolation(const char* db, - const char* table, - wsrep_buf_t* key, - size_t* key_len) + const char* table, + wsrep_buf_t* key, + size_t* key_len) { if (*key_len < 2) return false; @@ -1087,11 +1111,11 @@ static bool wsrep_prepare_key_for_isolation(const char* db, case 1: case 2: case 3: + case 4: { *key_len= 0; if (db) { - // sql_print_information("%s.%s", db, table); key[*key_len].ptr= db; key[*key_len].len= strlen(db); ++(*key_len); @@ -1105,26 +1129,23 @@ static bool wsrep_prepare_key_for_isolation(const char* db, break; } default: + assert(0); + WSREP_ERROR("Unsupported protocol version: %ld", wsrep_protocol_version); + unireg_abort(1); return false; } - return true; -} + return true; +} static bool wsrep_prepare_key_for_isolation(const char* db, const char* table, wsrep_key_arr_t* ka) { wsrep_key_t* tmp; - - if (!ka->keys) - tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t), - MYF(0)); - else - tmp= (wsrep_key_t*)my_realloc(ka->keys, - (ka->keys_len + 1) * sizeof(wsrep_key_t), - MYF(0)); - + tmp= (wsrep_key_t*)my_realloc(ka->keys, + (ka->keys_len + 1) * sizeof(wsrep_key_t), + MYF(MY_ALLOW_ZERO_PTR)); if (!tmp) { WSREP_ERROR("Can't allocate memory for key_array"); @@ -1150,7 +1171,6 @@ static bool wsrep_prepare_key_for_isolation(const char* db, return true; } - static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db, Alter_info* alter_info, wsrep_key_arr_t* ka) @@ -1177,7 +1197,6 @@ static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db, return true; } - static bool wsrep_prepare_keys_for_isolation(THD* thd, const char* db, const char* table, @@ -1205,16 +1224,19 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, if (!wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info, ka)) goto err; } - return false; err: - wsrep_keys_free(ka); - return true; + wsrep_keys_free(ka); + return true; } +/* + * Prepare key list from db/table and table_list + * + * Return zero in case of success, 1 in case of failure. + */ -/* Prepare key list from db/table and table_list */ bool wsrep_prepare_keys_for_isolation(THD* thd, const char* db, const char* table, @@ -1224,7 +1246,6 @@ bool wsrep_prepare_keys_for_isolation(THD* thd, return wsrep_prepare_keys_for_isolation(thd, db, table, table_list, NULL, ka); } - bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len, const uchar* row_id, size_t row_id_len, wsrep_buf_t* key, size_t* key_len) @@ -1236,37 +1257,110 @@ bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len, { case 0: { - key[0].ptr = cache_key; - key[0].len = cache_key_len; + key[0].ptr= cache_key; + key[0].len= cache_key_len; - *key_len = 1; + *key_len= 1; break; } case 1: case 2: case 3: + case 4: { - key[0].ptr = cache_key; - key[0].len = strlen( (char*)cache_key ); + key[0].ptr= cache_key; + key[0].len= strlen( (char*)cache_key ); - key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1; - key[1].len = strlen( (char*)(key[1].ptr) ); + key[1].ptr= cache_key + strlen( (char*)cache_key ) + 1; + key[1].len= strlen( (char*)(key[1].ptr) ); - *key_len = 2; + *key_len= 2; break; } default: return false; } - key[*key_len].ptr = row_id; - key[*key_len].len = row_id_len; + key[*key_len].ptr= row_id; + key[*key_len].len= row_id_len; ++(*key_len); return true; } +bool wsrep_prepare_key_for_innodb(THD* thd, + const uchar* cache_key, + size_t cache_key_len, + const uchar* row_id, + size_t row_id_len, + wsrep_buf_t* key, + size_t* key_len) +{ + + return wsrep_prepare_key(cache_key, cache_key_len, row_id, row_id_len, key, key_len); +} + +wsrep::key wsrep_prepare_key_for_toi(const char* db, const char* table, + enum wsrep::key::type type) +{ + wsrep::key ret(type); + DBUG_ASSERT(db); + ret.append_key_part(db, strlen(db)); + if (table) ret.append_key_part(table, strlen(table)); + return ret; +} +wsrep::key_array +wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db, + Alter_info* alter_info) + +{ + wsrep::key_array ret; + Key *key; + List_iterator<Key> key_iterator(alter_info->key_list); + while ((key= key_iterator++)) + { + if (key->type == Key::FOREIGN_KEY) + { + Foreign_key *fk_key= (Foreign_key *)key; + const char *db_name= fk_key->ref_db.str; + const char *table_name= fk_key->ref_table.str; + if (!db_name) + { + db_name= child_table_db; + } + ret.push_back(wsrep_prepare_key_for_toi(db_name, table_name, + wsrep::key::exclusive)); + } + } + return ret; +} + +wsrep::key_array wsrep_prepare_keys_for_toi(const char* db, + const char* table, + const TABLE_LIST* table_list, + Alter_info* alter_info) +{ + wsrep::key_array ret; + if (db || table) + { + ret.push_back(wsrep_prepare_key_for_toi(db, table, wsrep::key::exclusive)); + } + for (const TABLE_LIST* table= table_list; table; table= table->next_global) + { + ret.push_back(wsrep_prepare_key_for_toi(table->db.str, table->table_name.str, + wsrep::key::exclusive)); + } + if (alter_info && (alter_info->flags & ALTER_ADD_FOREIGN_KEY)) + { + wsrep::key_array fk(wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info)); + if (!fk.empty()) + { + ret.insert(ret.end(), fk.begin(), fk.end()); + } + } + return ret; +} /* * Construct Query_log_Event from thd query and serialize it * into buffer. @@ -1277,7 +1371,7 @@ int wsrep_to_buf_helper( THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) { IO_CACHE tmp_io_cache; - Log_event_writer writer(&tmp_io_cache,0); + Log_event_writer writer(&tmp_io_cache, 0); if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, 65536, MYF(MY_WME))) return 1; @@ -1365,7 +1459,7 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len) LEX *lex= thd->lex; SELECT_LEX *select_lex= lex->first_select_lex(); TABLE_LIST *first_table= select_lex->table_list.first; - TABLE_LIST *views = first_table; + TABLE_LIST *views= first_table; LEX_USER *definer; String buff; const LEX_CSTRING command[3]= @@ -1390,16 +1484,16 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len) if (definer) { - views->definer.user = definer->user; - views->definer.host = definer->host; + views->definer.user= definer->user; + views->definer.host= definer->host; } else { WSREP_ERROR("Failed to get DEFINER for VIEW."); return 1; } - views->algorithm = lex->create_view->algorithm; - views->view_suid = lex->create_view->suid; - views->with_check = lex->create_view->check; + views->algorithm = lex->create_view->algorithm; + views->view_suid = lex->create_view->suid; + views->with_check = lex->create_view->check; view_store_options(thd, views, &buff); buff.append(STRING_WITH_LEN("VIEW ")); @@ -1425,12 +1519,8 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len) buff.append(')'); } buff.append(STRING_WITH_LEN(" AS ")); - //buff.append(views->source.str, views->source.length); buff.append(thd->lex->create_view->select.str, thd->lex->create_view->select.length); - //int errcode= query_error_code(thd, TRUE); - //if (thd->binlog_query(THD::STMT_QUERY_TYPE, - // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); } @@ -1496,8 +1586,7 @@ static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len) /* Forward declarations. */ -static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len); -static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); +int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); /* Decide if statement should run in TOI. @@ -1577,6 +1666,7 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table, } } +#if UNUSED /* 323f269d4099 (Jan Lindström 2018-07-19) */ static const char* wsrep_get_query_or_msg(const THD* thd) { switch(thd->lex->sql_command) @@ -1589,58 +1679,70 @@ static const char* wsrep_get_query_or_msg(const THD* thd) return "REVOKE"; case SQLCOM_SET_OPTION: if (thd->lex->definer) - return "SET PASSWORD"; + return "SET PASSWORD"; /* fallthrough */ default: return thd->query(); } } +#endif //UNUSED -/* - returns: - 0: statement was replicated as TOI - 1: TOI replication was skipped - -1: TOI replication failed - */ -static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_, - const TABLE_LIST* table_list, - Alter_info* alter_info) +static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len) { - wsrep_status_t ret(WSREP_WARNING); - uchar* buf(0); - size_t buf_len(0); - int buf_err; - int rc= 0; + String log_query; + sp_head *sp= thd->lex->sphead; + sql_mode_t saved_mode= thd->variables.sql_mode; + String retstr(64); + LEX_CSTRING returns= empty_clex_str; + retstr.set_charset(system_charset_info); - if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false) + log_query.set_charset(system_charset_info); + + if (sp->m_handler->type() == TYPE_ENUM_FUNCTION) { - WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd)); + sp_returns_type(thd, retstr, sp); + returns= retstr.lex_cstring(); + } + if (sp->m_handler-> + show_create_sp(thd, &log_query, + sp->m_explicit_name ? sp->m_db : null_clex_str, + sp->m_name, sp->m_params, returns, + sp->m_body, sp->chistics(), + thd->lex->definer[0], + thd->lex->create_info, + saved_mode)) + { + WSREP_WARN("SP create string failed: schema: %s, query: %s", + thd->get_db(), thd->query()); return 1; } - WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), - thd->wsrep_exec_mode, wsrep_get_query_or_msg(thd)); + return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); +} +static int wsrep_TOI_event_buf(THD* thd, uchar** buf, size_t* buf_len) +{ + int err; switch (thd->lex->sql_command) { case SQLCOM_CREATE_VIEW: - buf_err= create_view_query(thd, &buf, &buf_len); + err= create_view_query(thd, buf, buf_len); break; case SQLCOM_CREATE_PROCEDURE: case SQLCOM_CREATE_SPFUNCTION: - buf_err= wsrep_create_sp(thd, &buf, &buf_len); + err= wsrep_create_sp(thd, buf, buf_len); break; case SQLCOM_CREATE_TRIGGER: - buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len); + err= wsrep_create_trigger_query(thd, buf, buf_len); break; case SQLCOM_CREATE_EVENT: - buf_err= wsrep_create_event_query(thd, &buf, &buf_len); + err= wsrep_create_event_query(thd, buf, buf_len); break; case SQLCOM_ALTER_EVENT: - buf_err= wsrep_alter_event_query(thd, &buf, &buf_len); + err= wsrep_alter_event_query(thd, buf, buf_len); break; case SQLCOM_DROP_TABLE: - buf_err= wsrep_drop_table_query(thd, &buf, &buf_len); + err= wsrep_drop_table_query(thd, buf, buf_len); break; case SQLCOM_CREATE_ROLE: if (sp_process_definer(thd)) @@ -1649,169 +1751,212 @@ static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_, } /* fallthrough */ default: - buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), - &buf, &buf_len); + err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), buf, + buf_len); break; } - wsrep_key_arr_t key_arr= {0, 0}; - struct wsrep_buf buff = { buf, buf_len }; - if (!buf_err && - !wsrep_prepare_keys_for_isolation(thd, db_, table_, - table_list, alter_info, &key_arr) && - key_arr.keys_len > 0 && - WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, - key_arr.keys, key_arr.keys_len, - &buff, 1, - &thd->wsrep_trx_meta))) - { - thd->wsrep_exec_mode= TOTAL_ORDER; - wsrep_to_isolation++; - wsrep_keys_free(&key_arr); - WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd), - thd->wsrep_exec_mode); - } - else if (key_arr.keys_len > 0) { - /* jump to error handler in mysql_execute_command() */ - WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. Check wsrep " - "connection state and retry the query.", - ret, - thd->get_db(), - (thd->query()) ? thd->query() : "void"); - my_message(ER_LOCK_DEADLOCK, "WSREP replication failed. Check " - "your wsrep connection state and retry the query.", MYF(0)); - wsrep_keys_free(&key_arr); - rc= -1; - } - else { - /* non replicated DDL, affecting temporary tables only */ - WSREP_DEBUG("TO isolation skipped for: %d, sql: %s." - "Only temporary tables affected.", - ret, (thd->query()) ? thd->query() : "void"); - rc= 1; + return err; +} + +static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */) +{ + if (wsrep_thd_trx_seqno(thd) > 0) + { + /* GTID was granted and TO acquired - need to log event and release TO */ + if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); + if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; } + wsrep::client_state& cs(thd->wsrep_cs()); + int const ret= cs.leave_toi(); + if (ret) + { + WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, " + "schema: %s, SQL: %s, rcode: %d wsrep_error: %s", + (long long)thd->real_id, thd->db.str, + thd->query(), ret, wsrep::to_c_string(cs.current_error())); + goto fail; + } } - if (buf) my_free(buf); - return rc; + return; +fail: + WSREP_ERROR("Failed to release TOI resources. Need to abort."); + unireg_abort(1); } -static void wsrep_TOI_end(THD *thd) { - wsrep_status_t ret; - wsrep_to_isolation--; - WSREP_DEBUG("TO END: %lld, %d: %s", (long long)wsrep_thd_trx_seqno(thd), - thd->wsrep_exec_mode, wsrep_get_query_or_msg(thd)); +/* + returns: + 0: statement was replicated as TOI + 1: TOI replication was skipped + -1: TOI replication failed + */ +static int wsrep_TOI_begin(THD *thd, const char *db, const char *table, + const TABLE_LIST* table_list, + Alter_info* alter_info) +{ + DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_TOI); - wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid, - thd->wsrep_trx_meta.gtid.seqno); - WSREP_DEBUG("TO END: %lld, update seqno", - (long long)wsrep_thd_trx_seqno(thd)); - - if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { - WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd)); + WSREP_DEBUG("TOI Begin"); + if (wsrep_can_run_in_toi(thd, db, table, table_list) == false) + { + WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd)); + return 1; } - else { - WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s", - ret, - thd->get_db(), - (thd->query()) ? thd->query() : "void"); + + uchar* buf= 0; + size_t buf_len(0); + int buf_err; + int rc; + + buf_err= wsrep_TOI_event_buf(thd, &buf, &buf_len); + if (buf_err) { + WSREP_ERROR("Failed to create TOI event buf: %d", buf_err); + my_message(ER_UNKNOWN_ERROR, + "WSREP replication failed to prepare TOI event buffer. " + "Check your query.", + MYF(0)); + return -1; } -} + struct wsrep_buf buff= { buf, buf_len }; -static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_) -{ - wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), - thd->wsrep_exec_mode, thd->query() ); + wsrep::key_array key_array= + wsrep_prepare_keys_for_toi(db, table, table_list, alter_info); - ret = wsrep->desync(wsrep); - if (ret != WSREP_OK) + if (thd->has_read_only_protection()) { - WSREP_WARN("RSU desync failed %d for schema: %s, query: %s", - ret, thd->get_db(), thd->query()); - my_error(ER_LOCK_DEADLOCK, MYF(0)); - return(ret); + /* non replicated DDL, affecting temporary tables only */ + WSREP_DEBUG("TO isolation skipped, sql: %s." + "Only temporary tables affected.", + WSREP_QUERY(thd)); + if (buf) my_free(buf); + return -1; } - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying++; - mysql_mutex_unlock(&LOCK_wsrep_replaying); + thd_proc_info(thd, "acquiring total order isolation"); - if (wsrep_wait_committing_connections_close(5000)) + wsrep::client_state& cs(thd->wsrep_cs()); + int ret= cs.enter_toi(key_array, + wsrep::const_buffer(buff.ptr, buff.len), + wsrep::provider::flag::start_transaction | + wsrep::provider::flag::commit); + + if (ret) { - /* no can do, bail out from DDL */ - WSREP_WARN("RSU failed due to pending transactions, schema: %s, query %s", - thd->get_db(), thd->query()); - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - mysql_mutex_unlock(&LOCK_wsrep_replaying); + DBUG_ASSERT(cs.current_error()); + WSREP_DEBUG("to_execute_start() failed for %llu: %s, seqno: %lld", + thd->thread_id, WSREP_QUERY(thd), + (long long)wsrep_thd_trx_seqno(thd)); - ret = wsrep->resync(wsrep); - if (ret != WSREP_OK) + /* jump to error handler in mysql_execute_command() */ + switch (cs.current_error()) { - WSREP_WARN("resync failed %d for schema: %s, query: %s", - ret, thd->get_db(), thd->query()); + case wsrep::e_size_exceeded_error: + WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. " + "Maximum size exceeded.", + ret, + (thd->db.str ? thd->db.str : "(null)"), + WSREP_QUERY(thd)); + my_error(ER_ERROR_DURING_COMMIT, MYF(0), WSREP_SIZE_EXCEEDED); + break; + default: + WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. " + "Check wsrep connection state and retry the query.", + ret, + (thd->db.str ? thd->db.str : "(null)"), + WSREP_QUERY(thd)); + if (!thd->is_error()) + { + my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check " + "your wsrep connection state and retry the query."); + } } - - my_error(ER_LOCK_DEADLOCK, MYF(0)); - return(1); + rc= -1; } - - wsrep_seqno_t seqno = wsrep->pause(wsrep); - if (seqno == WSREP_SEQNO_UNDEFINED) - { - WSREP_WARN("pause failed %lld for schema: %s, query: %s", (long long)seqno, - thd->get_db(), thd->query()); - return(1); + else { + ++wsrep_to_isolation; + rc= 0; } - WSREP_DEBUG("paused at %lld", (long long)seqno); - thd->variables.wsrep_on = 0; - return 0; -} -static void wsrep_RSU_end(THD *thd) -{ - wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), - thd->wsrep_exec_mode, thd->query() ); + if (buf) my_free(buf); + if (rc) wsrep_TOI_begin_failed(thd, NULL); - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - mysql_mutex_unlock(&LOCK_wsrep_replaying); + return rc; +} - ret = wsrep->resume(wsrep); - if (ret != WSREP_OK) +static void wsrep_TOI_end(THD *thd) { + int ret; + wsrep_to_isolation--; + wsrep::client_state& client_state(thd->wsrep_cs()); + DBUG_ASSERT(wsrep_thd_is_local_toi(thd)); + WSREP_DEBUG("TO END: %lld: %s", client_state.toi_meta().seqno().get(), + WSREP_QUERY(thd)); + + if (wsrep_thd_is_local_toi(thd)) { - WSREP_WARN("resume failed %d for schema: %s, query: %s", ret, - thd->get_db(), thd->query()); + wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); + if (thd->is_error() && !wsrep_must_ignore_error(thd)) + { + wsrep_apply_error err; + err.store(thd); + client_state.leave_toi(); + } + else + { + ret= client_state.leave_toi(); + } + + if (ret == 0) + { + WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get()); + } + else + { + WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s", + ret, (thd->db.str ? thd->db.str : "(null)"), WSREP_QUERY(thd)); + } } +} - ret = wsrep->resync(wsrep); - if (ret != WSREP_OK) +static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_) +{ + WSREP_DEBUG("RSU BEGIN: %lld, : %s", wsrep_thd_trx_seqno(thd), + WSREP_QUERY(thd)); + if (thd->wsrep_cs().begin_rsu(5000)) { - WSREP_WARN("resync failed %d for schema: %s, query: %s", ret, - thd->get_db(), thd->query()); - return; + WSREP_WARN("RSU begin failed"); + } + else + { + thd->variables.wsrep_on= 0; } + return 0; +} - thd->variables.wsrep_on = 1; +static void wsrep_RSU_end(THD *thd) +{ + WSREP_DEBUG("RSU END: %lld : %s", wsrep_thd_trx_seqno(thd), + WSREP_QUERY(thd)); + if (thd->wsrep_cs().end_rsu()) + { + WSREP_WARN("Failed to end RSU, server may need to be restarted"); + } + thd->variables.wsrep_on= 1; } int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, const TABLE_LIST* table_list, Alter_info* alter_info) { - int ret= 0; - /* No isolation for applier or replaying threads. */ - if (thd->wsrep_exec_mode == REPL_RECV) - return 0; + if (!wsrep_thd_is_local(thd)) return 0; + int ret= 0; mysql_mutex_lock(&thd->LOCK_thd_data); - if (thd->wsrep_conflict_state == MUST_ABORT) + if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort) { WSREP_INFO("thread: %lld schema: %s query: %s has been aborted due to multi-master conflict", (longlong) thd->thread_id, thd->get_db(), thd->query()); @@ -1820,20 +1965,20 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, } mysql_mutex_unlock(&thd->LOCK_thd_data); - DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE); - DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(wsrep_thd_is_local(thd)); + DBUG_ASSERT(thd->wsrep_trx().ws_meta().seqno().is_undefined()); - if (thd->has_read_only_protection()) + if (thd->global_read_lock.is_acquired()) { - WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %lld", - thd->query(), (longlong) thd->thread_id); + WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %llu", + WSREP_QUERY(thd), thd->thread_id); return -1; } if (wsrep_debug && thd->mdl_context.has_locks()) { - WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lld", - thd->query(), (longlong) thd->thread_id); + WSREP_DEBUG("thread holds MDL locks at TI begin: %s %llu", + WSREP_QUERY(thd), thd->thread_id); } /* @@ -1845,11 +1990,11 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, */ if (wsrep_auto_increment_control) { - thd->variables.auto_increment_offset = 1; - thd->variables.auto_increment_increment = 1; + thd->variables.auto_increment_offset= 1; + thd->variables.auto_increment_increment= 1; } - if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) + if (thd->variables.wsrep_on && wsrep_thd_is_local(thd)) { switch (thd->variables.wsrep_OSU_method) { case WSREP_OSU_TOI: @@ -1865,48 +2010,53 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, break; } switch (ret) { - case 0: thd->wsrep_exec_mode= TOTAL_ORDER; break; + case 0: /* wsrep_TOI_begin sould set toi mode */ break; case 1: /* TOI replication skipped, treat as success */ - ret = 0; + ret= 0; break; case -1: /* TOI replication failed, treat as error */ break; } } + return ret; } void wsrep_to_isolation_end(THD *thd) { - if (thd->wsrep_exec_mode == TOTAL_ORDER) + DBUG_ASSERT(wsrep_thd_is_local_toi(thd) || + wsrep_thd_is_in_rsu(thd)); + if (wsrep_thd_is_local_toi(thd)) { - switch(thd->variables.wsrep_OSU_method) - { - case WSREP_OSU_TOI: wsrep_TOI_end(thd); break; - case WSREP_OSU_RSU: wsrep_RSU_end(thd); break; - default: - WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu", - thd->variables.wsrep_OSU_method); - break; - } - wsrep_cleanup_transaction(thd); + DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_TOI); + wsrep_TOI_end(thd); } + else if (wsrep_thd_is_in_rsu(thd)) + { + DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_RSU); + wsrep_RSU_end(thd); + } + else + { + DBUG_ASSERT(0); + } + if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); } #define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra) \ WSREP_##severity( \ "%s\n" \ "schema: %.*s\n" \ - "request: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ - "granted: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ + "request: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)\n" \ + "granted: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)", \ msg, schema_len, schema, \ - (longlong) req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ - req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ + req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ + wsrep_thd_client_mode_str(req), wsrep_thd_client_state_str(req), wsrep_thd_transaction_state_str(req), \ req->get_command(), req->lex->sql_command, req->query(), \ - (longlong) gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ - gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ + gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ + wsrep_thd_client_mode_str(gra), wsrep_thd_client_state_str(gra), wsrep_thd_transaction_state_str(gra), \ gra->get_command(), gra->lex->sql_command, gra->query()); /** @@ -1919,58 +2069,47 @@ void wsrep_to_isolation_end(THD *thd) @retval FALSE Lock request cannot be granted */ -bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, +void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx, MDL_ticket *ticket, const MDL_key *key) { /* Fallback to the non-wsrep behaviour */ - if (!WSREP_ON) return FALSE; + if (!WSREP_ON) return; THD *request_thd= requestor_ctx->get_thd(); THD *granted_thd= ticket->get_ctx()->get_thd(); - bool ret= false; const char* schema= key->db_name(); int schema_len= key->db_name_length(); mysql_mutex_lock(&request_thd->LOCK_thd_data); + if (wsrep_thd_is_toi(request_thd) || + wsrep_thd_is_applying(request_thd)) { - /* - We consider granting MDL exceptions only for appliers (BF THD) and ones - executing under TOI mode. - - Rules: - 1. If granted/owner THD is also an applier (BF THD) or one executing - under TOI mode, then we grant the requested lock to the requester - THD. - @return true - - 2. If granted/owner THD is executing a FLUSH command or already has an - explicit lock, then do not grant the requested lock to the requester - THD and it has to wait. - @return false - - 3. In all other cases the granted/owner THD is aborted and the requested - lock is not granted to the requester THD, thus it has to wait. - @return false - */ - if (request_thd->wsrep_exec_mode == TOTAL_ORDER || - request_thd->wsrep_exec_mode == REPL_RECV) - { mysql_mutex_unlock(&request_thd->LOCK_thd_data); WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len, request_thd, granted_thd); ticket->wsrep_report(wsrep_debug); mysql_mutex_lock(&granted_thd->LOCK_thd_data); - if (granted_thd->wsrep_exec_mode == TOTAL_ORDER || - granted_thd->wsrep_exec_mode == REPL_RECV) + if (wsrep_thd_is_toi(granted_thd) || + wsrep_thd_is_applying(granted_thd)) { - WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len, - request_thd, granted_thd); - ticket->wsrep_report(true); - mysql_mutex_unlock(&granted_thd->LOCK_thd_data); - ret= true; + if (wsrep_thd_is_SR(granted_thd) && !wsrep_thd_is_SR(request_thd)) + { + WSREP_MDL_LOG(INFO, "MDL conflict, DDL vs SR", + schema, schema_len, request_thd, granted_thd); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); + } + else + { + WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len, + request_thd, granted_thd); + ticket->wsrep_report(true); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + unireg_abort(1); + } } else if (granted_thd->lex->sql_command == SQLCOM_FLUSH || granted_thd->mdl_context.has_explicit_locks()) @@ -1978,173 +2117,57 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, WSREP_DEBUG("BF thread waiting for FLUSH"); ticket->wsrep_report(wsrep_debug); mysql_mutex_unlock(&granted_thd->LOCK_thd_data); - ret= false; + } + else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE) + { + WSREP_DEBUG("DROP caused BF abort, conf %s", + wsrep_thd_transaction_state_str(granted_thd)); + ticket->wsrep_report(wsrep_debug); + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); } else { - /* Print some debug information. */ - if (wsrep_debug) + WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len, + request_thd, granted_thd); + ticket->wsrep_report(wsrep_debug); + if (granted_thd->wsrep_trx().active()) { - if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE || - request_thd->lex->sql_command == SQLCOM_DROP_SEQUENCE) - { - WSREP_DEBUG("DROP caused BF abort, conf %d", granted_thd->wsrep_conflict_state); - } - else if (granted_thd->wsrep_query_state == QUERY_COMMITTING) + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + wsrep_abort_thd(request_thd, granted_thd, 1); + } + else + { + /* + Granted_thd is likely executing with wsrep_on=0. If the requesting + thd is BF, BF abort and wait. + */ + mysql_mutex_unlock(&granted_thd->LOCK_thd_data); + if (wsrep_thd_is_BF(request_thd, FALSE)) { - WSREP_DEBUG("MDL granted, but committing thd abort scheduled"); + ha_abort_transaction(request_thd, granted_thd, TRUE); } else { - WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len, - request_thd, granted_thd); + WSREP_MDL_LOG(INFO, "MDL unknown BF-BF conflict", schema, schema_len, + request_thd, granted_thd); + ticket->wsrep_report(true); + unireg_abort(1); } - ticket->wsrep_report(true); } - - mysql_mutex_unlock(&granted_thd->LOCK_thd_data); - wsrep_abort_thd((void *) request_thd, (void *) granted_thd, 1); - ret= false; } } else { mysql_mutex_unlock(&request_thd->LOCK_thd_data); } - - return ret; -} - - -pthread_handler_t start_wsrep_THD(void *arg) -{ - THD *thd; - wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg; - - if (my_thread_init() || (!(thd= new THD(next_thread_id(), true)))) - { - goto error; - } - - mysql_mutex_lock(&LOCK_thread_count); - - if (wsrep_gtid_mode) - { - /* Adjust domain_id. */ - thd->variables.gtid_domain_id= wsrep_gtid_domain_id; - } - - thd->real_id=pthread_self(); // Keep purify happy - thread_created++; - threads.append(thd); - - my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); - - DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); - thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); - (void) mysql_mutex_unlock(&LOCK_thread_count); - - /* from bootstrap()... */ - thd->bootstrap=1; - thd->max_client_packet_length= thd->net.max_packet; - thd->security_ctx->master_access= ~(ulong)0; - - /* from handle_one_connection... */ - pthread_detach_this_thread(); - - mysql_thread_set_psi_id(thd->thread_id); - thd->thr_create_utime= microsecond_interval_timer(); - if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)) - { - close_connection(thd, ER_OUT_OF_RESOURCES); - statistic_increment(aborted_connects,&LOCK_status); - MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); - goto error; - } - -// </5.1.17> - /* - handle_one_connection() is normally the only way a thread would - start and would always be on the very high end of the stack , - therefore, the thread stack always starts at the address of the - first local variable of handle_one_connection, which is thd. We - need to know the start of the stack so that we could check for - stack overruns. - */ - DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n", - (long long)thd->thread_id)); - /* now that we've called my_thread_init(), it is safe to call DBUG_* */ - - thd->thread_stack= (char*) &thd; - if (thd->store_globals()) - { - close_connection(thd, ER_OUT_OF_RESOURCES); - statistic_increment(aborted_connects,&LOCK_status); - MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); - goto error; - } - - thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; - thd->security_ctx->skip_grants(); - - /* handle_one_connection() again... */ - //thd->version= refresh_version; - thd->proc_info= 0; - thd->set_command(COM_SLEEP); - thd->init_for_queries(); - - mysql_mutex_lock(&LOCK_thread_count); - wsrep_running_threads++; - mysql_cond_broadcast(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); - - processor(thd); - - close_connection(thd, 0); - - mysql_mutex_lock(&LOCK_thread_count); - wsrep_running_threads--; - WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads); - mysql_cond_broadcast(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); - - // Note: We can't call THD destructor without crashing - // if plugins have not been initialized. However, in most of the - // cases this means that pre SE initialization SST failed and - // we are going to exit anyway. - if (plugins_are_initialized) - { - net_end(&thd->net); - MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1)); - } - else - { - // TODO: lightweight cleanup to get rid of: - // 'Error in my_thread_global_end(): 2 threads didn't exit' - // at server shutdown - } - - unlink_not_visible_thd(thd); - delete thd; - my_thread_end(); - return(NULL); - -error: - WSREP_ERROR("Failed to create/initialize system thread"); - - /* Abort if its the first applier/rollbacker thread. */ - if (!mysqld_server_initialized) - unireg_abort(1); - else - return NULL; } - /**/ static bool abort_replicated(THD *thd) { bool ret_code= false; - if (thd->wsrep_query_state== QUERY_COMMITTING) + if (thd->wsrep_trx().state() == wsrep::transaction::s_committing) { WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id)); @@ -2154,38 +2177,34 @@ static bool abort_replicated(THD *thd) return ret_code; } - /**/ static inline bool is_client_connection(THD *thd) { return (thd->wsrep_client_thread && thd->variables.wsrep_on); } - static inline bool is_replaying_connection(THD *thd) { bool ret; mysql_mutex_lock(&thd->LOCK_thd_data); - ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false; + ret= (thd->wsrep_trx().state() == wsrep::transaction::s_replaying) ? true : false; mysql_mutex_unlock(&thd->LOCK_thd_data); return ret; } - static inline bool is_committing_connection(THD *thd) { bool ret; mysql_mutex_lock(&thd->LOCK_thd_data); - ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false; + ret= (thd->wsrep_trx().state() == wsrep::transaction::s_committing) ? true : false; mysql_mutex_unlock(&thd->LOCK_thd_data); return ret; } - static bool have_client_connections() { THD *tmp; @@ -2222,7 +2241,6 @@ static void wsrep_close_thread(THD *thd) } } - static my_bool have_committing_connections() { THD *tmp; @@ -2236,6 +2254,7 @@ static my_bool have_committing_connections() if (is_committing_connection(tmp)) { + mysql_mutex_unlock(&LOCK_thread_count); return TRUE; } } @@ -2243,7 +2262,6 @@ static my_bool have_committing_connections() return FALSE; } - int wsrep_wait_committing_connections_close(int wait_time) { int sleep_time= 100; @@ -2261,8 +2279,7 @@ int wsrep_wait_committing_connections_close(int wait_time) return 0; } - -void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd) +void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd) { /* First signal all threads that it's time to die @@ -2305,12 +2322,7 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd) /* instead of wsrep_close_thread() we do now soft kill by THD::awake */ - mysql_mutex_lock(&tmp->LOCK_thd_data); - tmp->awake(KILL_CONNECTION); - - mysql_mutex_unlock(&tmp->LOCK_thd_data); - } mysql_mutex_unlock(&LOCK_thread_count); @@ -2360,7 +2372,6 @@ void wsrep_close_applier(THD *thd) wsrep_close_thread(thd); } - void wsrep_close_threads(THD *thd) { THD *tmp; @@ -2386,10 +2397,12 @@ void wsrep_wait_appliers_close(THD *thd) { /* Wait for wsrep appliers to gracefully exit */ mysql_mutex_lock(&LOCK_thread_count); - while (wsrep_running_threads > 1) - // 1 is for rollbacker thread which needs to be killed explicitly. - // This gotta be fixed in a more elegant manner if we gonna have arbitrary - // number of non-applier wsrep threads. + while (wsrep_running_threads > 2) + /* + 2 is for rollbacker thread which needs to be killed explicitly. + This gotta be fixed in a more elegant manner if we gonna have arbitrary + number of non-applier wsrep threads. + */ { if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) { @@ -2425,7 +2438,6 @@ void wsrep_wait_appliers_close(THD *thd) */ } - void wsrep_kill_mysql(THD *thd) { if (mysqld_server_started) @@ -2442,267 +2454,167 @@ void wsrep_kill_mysql(THD *thd) } } - -static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len) +void +wsrep_last_committed_id(wsrep_gtid_t* gtid) { - String log_query; - sp_head *sp = thd->lex->sphead; - sql_mode_t saved_mode= thd->variables.sql_mode; - String retstr(64); - LEX_CSTRING returns= empty_clex_str; - retstr.set_charset(system_charset_info); - - log_query.set_charset(system_charset_info); - - if (sp->m_handler->type() == TYPE_ENUM_FUNCTION) - { - sp_returns_type(thd, retstr, sp); - returns= retstr.lex_cstring(); - } - if (sp->m_handler-> - show_create_sp(thd, &log_query, - sp->m_explicit_name ? sp->m_db : null_clex_str, - sp->m_name, sp->m_params, returns, - sp->m_body, sp->chistics(), - thd->lex->definer[0], - thd->lex->create_info, - saved_mode)) - { - WSREP_WARN("SP create string failed: schema: %s, query: %s", - thd->get_db(), thd->query()); - return 1; - } - - return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); + wsrep::gtid ret= Wsrep_server_state::instance().last_committed_gtid(); + memcpy(gtid->uuid.data, ret.id().data(), sizeof(gtid->uuid.data)); + gtid->seqno= ret.seqno().get(); } - -extern int wsrep_on(THD *thd) +void +wsrep_node_uuid(wsrep_uuid_t& uuid) { - return (int)(WSREP(thd)); + uuid= node_uuid; } - -extern "C" bool wsrep_thd_is_wsrep_on(THD *thd) +int wsrep_must_ignore_error(THD* thd) { - return thd->variables.wsrep_on; -} + const int error= thd->get_stmt_da()->sql_errno(); + const uint flags= sql_command_flags[thd->lex->sql_command]; + DBUG_ASSERT(error); + DBUG_ASSERT((wsrep_thd_is_toi(thd)) || + (wsrep_thd_is_applying(thd) && thd->wsrep_apply_toi)); -bool wsrep_consistency_check(THD *thd) -{ - return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING; -} - - -extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode) -{ - thd->wsrep_exec_mode= mode; -} + if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL)) + goto ignore_error; + if ((flags & CF_WSREP_MAY_IGNORE_ERRORS) && + (wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DDL)) + { + switch (error) + { + case ER_DB_DROP_EXISTS: + case ER_BAD_TABLE_ERROR: + case ER_CANT_DROP_FIELD_OR_KEY: + goto ignore_error; + } + } -extern "C" void wsrep_thd_set_query_state( - THD *thd, enum wsrep_query_state state) -{ - thd->wsrep_query_state= state; -} - - -void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state) -{ - if (WSREP(thd)) thd->wsrep_conflict_state= state; -} - - -enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd) -{ - return thd->wsrep_exec_mode; -} - - -const char *wsrep_thd_exec_mode_str(THD *thd) -{ - return - (!thd) ? "void" : - (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" : - (thd->wsrep_exec_mode == REPL_RECV) ? "applier" : - (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" : - (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void"; -} - - -enum wsrep_query_state wsrep_thd_query_state(THD *thd) -{ - return thd->wsrep_query_state; -} - - -const char *wsrep_thd_query_state_str(THD *thd) -{ - return - (!thd) ? "void" : - (thd->wsrep_query_state == QUERY_IDLE) ? "idle" : - (thd->wsrep_query_state == QUERY_EXEC) ? "executing" : - (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" : - (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" : - (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void"; -} - - -enum wsrep_conflict_state wsrep_thd_get_conflict_state(THD *thd) -{ - return thd->wsrep_conflict_state; -} - - -const char *wsrep_thd_conflict_state_str(THD *thd) -{ - return - (!thd) ? "void" : - (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" : - (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" : - (thd->wsrep_conflict_state == ABORTING) ? "aborting" : - (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" : - (thd->wsrep_conflict_state == REPLAYING) ? "replaying" : - (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" : - (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void"; -} - - -wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd) -{ - return &thd->wsrep_ws_handle; -} - - -void wsrep_thd_LOCK(THD *thd) -{ - mysql_mutex_lock(&thd->LOCK_thd_data); -} - + return 0; -void wsrep_thd_UNLOCK(THD *thd) -{ - mysql_mutex_unlock(&thd->LOCK_thd_data); +ignore_error: + WSREP_WARN("Ignoring error '%s' on query. " + "Default database: '%s'. Query: '%s', Error_code: %d", + thd->get_stmt_da()->message(), + print_slave_db_safe(thd->db.str), + thd->query(), + error); + return 1; } - -extern "C" time_t wsrep_thd_query_start(THD *thd) +int wsrep_ignored_error_code(Log_event* ev, int error) { - return thd->query_start(); -} + const THD* thd= ev->thd; + DBUG_ASSERT(error); + DBUG_ASSERT(wsrep_thd_is_applying(thd) && + !wsrep_thd_is_local_toi(thd)); -extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd) -{ - return thd->wsrep_rand; -} - -longlong wsrep_thd_trx_seqno(THD *thd) -{ - return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED; -} + if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DML)) + { + const int ev_type= ev->get_type_code(); + if ((ev_type == DELETE_ROWS_EVENT || ev_type == DELETE_ROWS_EVENT_V1) + && error == ER_KEY_NOT_FOUND) + goto ignore_error; + } + return 0; -extern "C" query_id_t wsrep_thd_query_id(THD *thd) -{ - return thd->query_id; +ignore_error: + WSREP_WARN("Ignoring error '%s' on %s event. Error_code: %d", + thd->get_stmt_da()->message(), + ev->get_type_str(), + error); + return 1; } - -char *wsrep_thd_query(THD *thd) +bool wsrep_provider_is_SR_capable() { - return (thd) ? thd->query() : NULL; + return Wsrep_server_state::has_capability(wsrep::provider::capability::streaming); } -extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd) +int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all) { - return thd->wsrep_last_query_id; + if (((wsrep_thd_is_local(thd) && + (WSREP_EMULATE_BINLOG(thd) || !thd->variables.sql_log_bin)) || + (wsrep_thd_is_applying(thd) && !opt_log_slave_updates)) + && wsrep_thd_trx_seqno(thd) > 0) + { + wsrep_apply_error unused; + return wsrep_ordered_commit(thd, all, unused); + } + return 0; } - -extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) +wsrep_status_t wsrep_tc_log_commit(THD* thd) { - thd->wsrep_last_query_id= id; -} - + int cookie; + my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); -extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) -{ - if (signal) + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD); + if (wsrep_before_commit(thd, true)) { - thd->awake(KILL_QUERY); + WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu", + thd->thread_id); + return WSREP_TRX_FAIL; } - else + cookie= tc_log->log_and_order(thd, xid, 1, false, true); + if (wsrep_after_commit(thd, true)) { - mysql_mutex_lock(&LOCK_wsrep_replaying); - mysql_cond_broadcast(&COND_wsrep_replaying); - mysql_mutex_unlock(&LOCK_wsrep_replaying); + WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu", + thd->thread_id); + return WSREP_TRX_FAIL; + } + if (!cookie) + { + WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie); + return WSREP_TRX_FAIL; + } + if (tc_log->unlog(cookie, xid)) + { + WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie); + return WSREP_TRX_FAIL; } -} - - -int wsrep_thd_retry_counter(THD *thd) -{ - return(thd->wsrep_retry_counter); -} - - -extern "C" bool wsrep_thd_ignore_table(THD *thd) -{ - return thd->wsrep_ignore_table; -} - -extern int -wsrep_trx_order_before(THD *thd1, THD *thd2) -{ - if (wsrep_thd_trx_seqno(thd1) < wsrep_thd_trx_seqno(thd2)) { - WSREP_DEBUG("BF conflict, order: %lld %lld\n", - (long long)wsrep_thd_trx_seqno(thd1), - (long long)wsrep_thd_trx_seqno(thd2)); - return 1; + if (wsrep_after_statement(thd)) + { + return WSREP_TRX_FAIL; + } + /* Set wsrep transaction id if not set. */ + if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID) + { + if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID) + { + thd->set_wsrep_next_trx_id(thd->query_id); } - WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", - (long long)wsrep_thd_trx_seqno(thd1), - (long long)wsrep_thd_trx_seqno(thd2)); - return 0; + DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID); + } + if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id())) + { + return WSREP_TRX_FAIL; + } + DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID); + return WSREP_OK; } - -int wsrep_trx_is_aborting(THD *thd_ptr) +int wsrep_thd_retry_counter(const THD *thd) { - if (thd_ptr) { - if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) || - (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) { - return 1; - } - } - return 0; + return thd->wsrep_retry_counter; } - -void wsrep_copy_query(THD *thd) +extern bool wsrep_thd_ignore_table(THD *thd) { - thd->wsrep_retry_command = thd->get_command(); - thd->wsrep_retry_query_len = thd->query_length(); - if (thd->wsrep_retry_query) { - my_free(thd->wsrep_retry_query); - } - thd->wsrep_retry_query = (char *)my_malloc( - thd->wsrep_retry_query_len + 1, MYF(0)); - strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len); - thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0'; + return thd->wsrep_ignore_table; } - bool wsrep_is_show_query(enum enum_sql_command command) { DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; } - bool wsrep_create_like_table(THD* thd, TABLE_LIST* table, TABLE_LIST* src_table, HA_CREATE_INFO *create_info) @@ -2753,8 +2665,7 @@ wsrep_error_label: #endif } - -static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len) +int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len) { LEX *lex= thd->lex; String stmt_query; @@ -2809,88 +2720,165 @@ static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len) buf, buf_len); } -/***** callbacks for wsrep service ************/ - -my_bool get_wsrep_debug() +void* start_wsrep_THD(void *arg) { - return wsrep_debug; -} + THD *thd; -my_bool get_wsrep_load_data_splitting() -{ - return wsrep_load_data_splitting; -} + Wsrep_thd_args* thd_args= (Wsrep_thd_args*) arg; -long get_wsrep_protocol_version() -{ - return wsrep_protocol_version; -} + if (my_thread_init() || (!(thd= new THD(next_thread_id(), true)))) + { + goto error; + } -my_bool get_wsrep_drupal_282555_workaround() -{ - return wsrep_drupal_282555_workaround; -} + mysql_mutex_lock(&LOCK_thread_count); -my_bool get_wsrep_recovery() -{ - return wsrep_recovery; -} + if (wsrep_gtid_mode) + { + /* Adjust domain_id. */ + thd->variables.gtid_domain_id= wsrep_gtid_domain_id; + } -my_bool get_wsrep_log_conflicts() -{ - return wsrep_log_conflicts; -} + thd->real_id=pthread_self(); // Keep purify happy + thread_created++; + threads.append(thd); -wsrep_t *get_wsrep() -{ - return wsrep; -} + my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); -my_bool get_wsrep_certify_nonPK() -{ - return wsrep_certify_nonPK; -} + DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); + thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); + (void) mysql_mutex_unlock(&LOCK_thread_count); -void wsrep_lock_rollback() -{ - mysql_mutex_lock(&LOCK_wsrep_rollback); -} + /* from bootstrap()... */ + thd->bootstrap=1; + thd->max_client_packet_length= thd->net.max_packet; + thd->security_ctx->master_access= ~(ulong)0; -void wsrep_unlock_rollback() -{ - mysql_cond_signal(&COND_wsrep_rollback); - mysql_mutex_unlock(&LOCK_wsrep_rollback); -} + /* from handle_one_connection... */ + pthread_detach_this_thread(); -my_bool wsrep_aborting_thd_contains(THD *thd) -{ - mysql_mutex_assert_owner(&LOCK_wsrep_rollback); - wsrep_aborting_thd_t abortees = wsrep_aborting_thd; - while (abortees) + mysql_thread_set_psi_id(thd->thread_id); + thd->thr_create_utime= microsecond_interval_timer(); + if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)) { - if (abortees->aborting_thd == thd) - return true; - abortees = abortees->next; + close_connection(thd, ER_OUT_OF_RESOURCES); + statistic_increment(aborted_connects,&LOCK_status); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); + goto error; } - return false; + +// </5.1.17> + /* + handle_one_connection() is normally the only way a thread would + start and would always be on the very high end of the stack , + therefore, the thread stack always starts at the address of the + first local variable of handle_one_connection, which is thd. We + need to know the start of the stack so that we could check for + stack overruns. + */ + DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n", + (long long)thd->thread_id)); + /* now that we've called my_thread_init(), it is safe to call DBUG_* */ + + thd->thread_stack= (char*) &thd; + if (thd->store_globals()) + { + close_connection(thd, ER_OUT_OF_RESOURCES); + statistic_increment(aborted_connects,&LOCK_status); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); + delete thd; + delete thd_args; + goto error; + } + + thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; + thd->security_ctx->skip_grants(); + + /* handle_one_connection() again... */ + thd->proc_info= 0; + thd->set_command(COM_SLEEP); + thd->init_for_queries(); + mysql_mutex_lock(&LOCK_thread_count); + wsrep_running_threads++; + mysql_cond_broadcast(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + WSREP_DEBUG("wsrep system thread %llu, %p starting", + thd->thread_id, thd); + thd_args->fun()(thd, thd_args->args()); + + WSREP_DEBUG("wsrep system thread: %llu, %p closing", + thd->thread_id, thd); + + /* Wsrep may reset globals during thread context switches, store globals + before cleanup. */ + thd->store_globals(); + + close_connection(thd, 0); + + delete thd_args; + + mysql_mutex_lock(&LOCK_thread_count); + wsrep_running_threads--; + WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads); + mysql_cond_broadcast(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + /* + Note: We can't call THD destructor without crashing + if plugins have not been initialized. However, in most of the + cases this means that pre SE initialization SST failed and + we are going to exit anyway. + */ + if (plugins_are_initialized) + { + net_end(&thd->net); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1)); + } + else + { + /* + TODO: lightweight cleanup to get rid of: + 'Error in my_thread_global_end(): 2 threads didn't exit' + at server shutdown + */ + } + + unlink_not_visible_thd(thd); + delete thd; + my_thread_end(); + return(NULL); + +error: + WSREP_ERROR("Failed to create/initialize system thread"); + + /* Abort if its the first applier/rollbacker thread. */ + if (!mysqld_server_initialized) + unireg_abort(1); + else + return NULL; } -void wsrep_aborting_thd_enqueue(THD *thd) +enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit) { - mysql_mutex_assert_owner(&LOCK_wsrep_rollback); - wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t) - my_malloc(sizeof(struct wsrep_aborting_thd), MYF(0)); - aborting->aborting_thd = thd; - aborting->next = wsrep_aborting_thd; - wsrep_aborting_thd = aborting; + switch (unit) + { + case WSREP_FRAG_BYTES: return wsrep::streaming_context::bytes; + case WSREP_FRAG_ROWS: return wsrep::streaming_context::row; + case WSREP_FRAG_STATEMENTS: return wsrep::streaming_context::statement; + default: + DBUG_ASSERT(0); + return wsrep::streaming_context::bytes; + } } -bool wsrep_node_is_donor() +/***** callbacks for wsrep service ************/ + +my_bool get_wsrep_recovery() { - return (WSREP_ON) ? (wsrep_config_state->get_status() == 2) : false; + return wsrep_recovery; } -bool wsrep_node_is_synced() +bool wsrep_consistency_check(THD *thd) { - return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false; + return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING; } |