diff options
author | Nirbhay Choubey <nirbhay@mariadb.com> | 2015-07-14 16:05:29 -0400 |
---|---|---|
committer | Nirbhay Choubey <nirbhay@mariadb.com> | 2015-07-14 16:05:29 -0400 |
commit | dced5146bdfc46e200ba35a86c3c55fb60972e33 (patch) | |
tree | 22e3dd18f4edec2a585341fee607765f96b2744f /sql | |
parent | 75931feabe99595e9659a423e299c4229d3c02ba (diff) | |
download | mariadb-git-dced5146bdfc46e200ba35a86c3c55fb60972e33.tar.gz |
Merge branch '10.0-galera' into 10.1
Diffstat (limited to 'sql')
-rw-r--r-- | sql/CMakeLists.txt | 1 | ||||
-rw-r--r-- | sql/handler.cc | 9 | ||||
-rw-r--r-- | sql/log.cc | 49 | ||||
-rw-r--r-- | sql/log_event.cc | 1 | ||||
-rw-r--r-- | sql/mdl.cc | 12 | ||||
-rw-r--r-- | sql/mysqld.cc | 16 | ||||
-rw-r--r-- | sql/sql_class.cc | 11 | ||||
-rw-r--r-- | sql/sql_class.h | 3 | ||||
-rw-r--r-- | sql/sql_insert.cc | 74 | ||||
-rw-r--r-- | sql/sql_parse.cc | 21 | ||||
-rw-r--r-- | sql/sql_plugin_services.ic | 1 | ||||
-rw-r--r-- | sql/sql_table.cc | 11 | ||||
-rw-r--r-- | sql/sys_vars.cc | 6 | ||||
-rw-r--r-- | sql/wsrep_applier.cc | 53 | ||||
-rw-r--r-- | sql/wsrep_applier.h | 3 | ||||
-rw-r--r-- | sql/wsrep_dummy.cc | 3 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 36 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 188 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 20 | ||||
-rw-r--r-- | sql/wsrep_priv.h | 12 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 59 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 26 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 86 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 39 | ||||
-rw-r--r-- | sql/wsrep_xid.cc | 147 | ||||
-rw-r--r-- | sql/wsrep_xid.h | 35 |
26 files changed, 630 insertions, 292 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 169f260a91b..033a0e04ccf 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -28,6 +28,7 @@ IF(WITH_WSREP AND NOT EMBEDDED_LIBRARY) wsrep_binlog.cc wsrep_applier.cc wsrep_thd.cc + wsrep_xid.cc ) SET(WSREP_LIB wsrep) ELSE() diff --git a/sql/handler.cc b/sql/handler.cc index f85575fa208..ee433949d33 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -53,6 +53,7 @@ #include "wsrep_mysqld.h" #include "wsrep.h" +#include "wsrep_xid.h" /* While we have legacy_db_type, we have this array to @@ -1437,7 +1438,7 @@ int ha_commit_trans(THD *thd, bool all) if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid)) { // xid was rewritten by wsrep - xid= wsrep_xid_seqno(&thd->transaction.xid_state.xid); + xid= wsrep_xid_seqno(thd->transaction.xid_state.xid); } if (!is_real_trans) @@ -1830,8 +1831,8 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, for (int i=0; i < got; i ++) { my_xid x= WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ? - wsrep_xid_seqno(&info->list[i]) : - info->list[i].get_my_xid(); + wsrep_xid_seqno(info->list[i]) : + info->list[i].get_my_xid(); if (!x) // not "mine" - that is generated by external TM { #ifndef DBUG_OFF @@ -6013,7 +6014,7 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal) { DBUG_ENTER("ha_abort_transaction"); if (!WSREP(bf_thd) && - !(wsrep_OSU_method_options == WSREP_OSU_RSU && + !(bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU && bf_thd->wsrep_exec_mode == TOTAL_ORDER)) { DBUG_RETURN(0); } diff --git a/sql/log.cc b/sql/log.cc index 3010e5c22a3..8550a43a9ac 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1659,15 +1659,36 @@ int binlog_init(void *p) return 0; } +#ifdef WITH_WSREP +#include "wsrep_binlog.h" +#endif /* WITH_WSREP */ static int binlog_close_connection(handlerton *hton, THD *thd) { + DBUG_ENTER("binlog_close_connection"); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + if (cache_mngr && !cache_mngr->trx_cache.empty()) { + IO_CACHE* cache= get_trans_log(thd); + uchar *buf; + size_t len=0; + wsrep_write_cache_buf(cache, &buf, &len); + WSREP_WARN("binlog trx cache not empty (%lu bytes) @ connection close %lu", + len, thd->thread_id); + if (len > 0) wsrep_dump_rbr_buf(thd, buf, len); + + cache = cache_mngr->get_binlog_cache_log(false); + wsrep_write_cache_buf(cache, &buf, &len); + WSREP_WARN("binlog stmt cache not empty (%lu bytes) @ connection close %lu", + len, thd->thread_id); + if (len > 0) wsrep_dump_rbr_buf(thd, buf, len); + } +#endif /* WITH_WSREP */ DBUG_ASSERT(cache_mngr->trx_cache.empty() && cache_mngr->stmt_cache.empty()); thd_set_ha_data(thd, binlog_hton, NULL); cache_mngr->~binlog_cache_mngr(); my_free(cache_mngr); - return 0; + DBUG_RETURN(0); } /* @@ -5900,10 +5921,20 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) binlog_cache_data *cache_data= 0; bool is_trans_cache= FALSE; bool using_trans= event_info->use_trans_cache(); - bool direct= event_info->use_direct_logging(); + bool direct; ulong UNINIT_VAR(prev_binlog_id); DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); + /* + When binary logging is not enabled (--log-bin=0), wsrep-patch partially + enables it without opening the binlog file (MSQL_BIN_LOG::open(). + So, avoid writing directly to binlog file. + */ + if (wsrep_emulate_bin_log) + direct= false; + else + direct= event_info->use_direct_logging(); + if (thd->variables.option_bits & OPTION_GTID_BEGIN) { DBUG_PRINT("info", ("OPTION_GTID_BEGIN was set")); @@ -6891,8 +6922,11 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, Ha_trx_info *ha_info; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); - if (wsrep_emulate_bin_log) - DBUG_RETURN(0); + /* + Control should not be allowed beyond this point in wsrep_emulate_bin_log + mode. + */ + if (wsrep_emulate_bin_log) DBUG_RETURN(0); entry.thd= thd; entry.cache_mngr= cache_mngr; @@ -10096,7 +10130,14 @@ void thd_binlog_trx_reset(THD * thd) binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); if (cache_mngr) + { cache_mngr->reset(false, true); + if (!cache_mngr->stmt_cache.empty()) + { + WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query()); + cache_mngr->stmt_cache.reset(); + } + } } thd->clear_binlog_table_maps(); } diff --git a/sql/log_event.cc b/sql/log_event.cc index 1ba7dcade44..87fa145d730 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -12573,7 +12573,6 @@ void Incident_log_event::pack_info(THD *thd, Protocol *protocol) #if WITH_WSREP && !defined(MYSQL_CLIENT) -Format_description_log_event *wsrep_format_desc; // TODO: free them at the end /* read the first event from (*buf). The size of the (*buf) is (*buf_len). At the end (*buf) is shitfed to point to the following event or NULL and diff --git a/sql/mdl.cc b/sql/mdl.cc index 383cc042432..ec380aa7432 100644 --- a/sql/mdl.cc +++ b/sql/mdl.cc @@ -1063,6 +1063,13 @@ MDL_wait::timed_wait(MDL_context_owner *owner, struct timespec *abs_timeout, while (!m_wait_status && !owner->is_killed() && wait_result != ETIMEDOUT && wait_result != ETIME) { +#ifdef WITH_WSREP + if (wsrep_thd_is_BF(owner->get_thd(), true)) + { + wait_result= mysql_cond_wait(&m_COND_wait_status, &m_LOCK_wait_status); + } + else +#endif wait_result= mysql_cond_timedwait(&m_COND_wait_status, &m_LOCK_wait_status, abs_timeout); } @@ -1148,12 +1155,15 @@ void MDL_lock::Ticket_list::add_ticket(MDL_ticket *ticket) WSREP_DEBUG("MDL add_ticket inserted before: %lu %s", thd_get_thread_id(waiting->get_ctx()->get_thd()), wsrep_thd_query(waiting->get_ctx()->get_thd())); + /* Insert the ticket before the first non-BF waiting thd. */ m_list.insert_after(prev, ticket); added= true; } prev= waiting; } - if (!added) m_list.push_back(ticket); + + /* Otherwise, insert the ticket at the back of the waiting list. */ + if (!added) m_list.push_back(ticket); while ((granted= itg++)) { diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 6bd8fdebf10..692af10052f 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -4994,8 +4994,6 @@ static int init_server_components() } #endif - DBUG_ASSERT(!opt_bin_log || opt_bin_logname); - if (opt_bin_log) { /* Reports an error and aborts, if the --log-bin's path @@ -5087,6 +5085,11 @@ static int init_server_components() { set_ports(); // this is also called in network_init() later but we need // to know mysqld_port now - lp:1071882 + /* + Plugin initialization (plugin_init()) hasn't happened yet, set + maria_hton to 0. + */ + maria_hton= 0; wsrep_init_startup(true); } } @@ -5856,7 +5859,14 @@ int mysqld_main(int argc, char **argv) (char*) "" : mysqld_unix_port), mysqld_port, MYSQL_COMPILATION_COMMENT); - fclose(stdin); + + // try to keep fd=0 busy + if (!freopen(IF_WIN("NUL","/dev/null"), "r", stdin)) + { + // fall back on failure + fclose(stdin); + } + #if defined(_WIN32) && !defined(EMBEDDED_LIBRARY) Service.SetRunning(); #endif diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 8e4d35c4688..781a4fd92d6 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1427,12 +1427,10 @@ void THD::init(void) wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; wsrep_converted_lock_session= false; wsrep_retry_counter= 0; - wsrep_rli= NULL; wsrep_rgi= NULL; wsrep_PA_safe= true; wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_mysql_replicated = 0; - wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query_len = 0; #endif @@ -1634,7 +1632,6 @@ THD::~THD() mysql_mutex_lock(&LOCK_wsrep_thd); mysql_mutex_unlock(&LOCK_wsrep_thd); mysql_mutex_destroy(&LOCK_wsrep_thd); - if (wsrep_rli) delete wsrep_rli; if (wsrep_rgi) delete wsrep_rgi; #endif /* Close connection */ @@ -6712,14 +6709,6 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, The MYSQL_LOG::write() function will set the STMT_END_F flag and flush the pending rows event if necessary. */ - /* - Even though wsrep only supports ROW binary log format, a user can set - binlog format to STATEMENT (wsrep_forced_binlog_format). In which case - the control might reach here even when binary logging (--log-bin) is - not enabled. This is possible because wsrep patch partially enables - binary logging by setting wsrep_emulate_binlog. - */ - if (mysql_bin_log.is_open()) { Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, suppress_use, errcode); diff --git a/sql/sql_class.h b/sql/sql_class.h index d318061c90a..ae9113933d7 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -673,6 +673,7 @@ typedef struct system_variables my_bool wsrep_dirty_reads; uint wsrep_sync_wait; ulong wsrep_retry_autocommit; + ulong wsrep_OSU_method; double long_query_time_double, max_statement_time_double; my_bool pseudo_slave_mode; @@ -4383,6 +4384,8 @@ class select_insert :public select_result_interceptor { virtual int send_data(List<Item> &items); virtual void store_values(List<Item> &values); virtual bool can_rollback_data() { return 0; } + bool prepare_eof(); + bool send_ok_packet(); bool send_eof(); virtual void abort_result_set(); /* not implemented: select_insert is never re-used in prepared statements */ diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index fbdb53a4514..23043840f12 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -3691,14 +3691,14 @@ void select_insert::store_values(List<Item> &values) TRG_EVENT_INSERT); } -bool select_insert::send_eof() +bool select_insert::prepare_eof() { int error; bool const trans_table= table->file->has_transactions(); - ulonglong id, row_count; bool changed; killed_state killed_status= thd->killed; - DBUG_ENTER("select_insert::send_eof"); + + DBUG_ENTER("select_insert::prepare_eof"); DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", trans_table, table->file->table_type())); @@ -3749,7 +3749,7 @@ bool select_insert::send_eof() trans_table, FALSE, FALSE, errcode)) { table->file->ha_release_auto_increment(); - DBUG_RETURN(1); + DBUG_RETURN(true); } } table->file->ha_release_auto_increment(); @@ -3757,31 +3757,49 @@ bool select_insert::send_eof() if (error) { table->file->print_error(error,MYF(0)); - DBUG_RETURN(1); + DBUG_RETURN(true); } - if (suppress_my_ok) - DBUG_RETURN(0); + DBUG_RETURN(false); +} + +bool select_insert::send_ok_packet() { + char message[160]; /* status message */ + ulong row_count; /* rows affected */ + ulong id; /* last insert-id */ + + DBUG_ENTER("select_insert::send_ok_packet"); - char buff[160]; if (info.ignore) - sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records, - (ulong) (info.records - info.copied), - (long) thd->get_stmt_da()->current_statement_warn_count()); + my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO), + (ulong) info.records, (ulong) (info.records - info.copied), + (long) thd->get_stmt_da()->current_statement_warn_count()); else - sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records, - (ulong) (info.deleted+info.updated), - (long) thd->get_stmt_da()->current_statement_warn_count()); + my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO), + (ulong) info.records, (ulong) (info.deleted + info.updated), + (long) thd->get_stmt_da()->current_statement_warn_count()); + row_count= info.copied + info.deleted + - ((thd->client_capabilities & CLIENT_FOUND_ROWS) ? - info.touched : info.updated); + ((thd->client_capabilities & CLIENT_FOUND_ROWS) ? + info.touched : info.updated); + id= (thd->first_successful_insert_id_in_cur_stmt > 0) ? thd->first_successful_insert_id_in_cur_stmt : (thd->arg_of_last_insert_id_function ? thd->first_successful_insert_id_in_prev_stmt : (info.copied ? autoinc_value_of_last_inserted_row : 0)); - ::my_ok(thd, row_count, id, buff); - DBUG_RETURN(0); + + ::my_ok(thd, row_count, id, message); + + DBUG_RETURN(false); +} + +bool select_insert::send_eof() +{ + bool res; + DBUG_ENTER("select_insert::send_eof"); + res= (prepare_eof() || send_ok_packet()); + DBUG_RETURN(res); } void select_insert::abort_result_set() { @@ -4256,13 +4274,13 @@ void select_create::store_values(List<Item> &values) bool select_create::send_eof() { - if (select_insert::send_eof()) + DBUG_ENTER("select_create::send_eof"); + if (prepare_eof()) { abort_result_set(); - return 1; + DBUG_RETURN(true); } - exit_done= 1; // Avoid double calls /* Do an implicit commit at end of statement for non-temporary tables. This can fail, but we should unlock the table @@ -4283,7 +4301,7 @@ bool select_create::send_eof() thd->thread_id, thd->wsrep_conflict_state, thd->query()); mysql_mutex_unlock(&thd->LOCK_wsrep_thd); abort_result_set(); - return TRUE; + DBUG_RETURN(true); } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } @@ -4292,9 +4310,17 @@ bool select_create::send_eof() else if (!thd->is_current_stmt_binlog_format_row()) table->s->table_creation_was_logged= 1; + /* + exit_done must only be set after last potential call to + abort_result_set(). + */ + exit_done= 1; // Avoid double calls + table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + send_ok_packet(); + if (m_plock) { MYSQL_LOCK *lock= *m_plock; @@ -4315,12 +4341,12 @@ bool select_create::send_eof() create_info-> pos_in_locked_tables, table, lock)) - return 0; // ok + DBUG_RETURN(false); // ok /* Fail. Continue without locking the table */ } mysql_unlock_tables(thd, lock); } - return 0; + DBUG_RETURN(false); } diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index d5cfb1e6eca..f1ae9287080 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -916,7 +916,11 @@ bool do_command(THD *thd) { bool return_value; char *packet= 0; +#ifdef WITH_WSREP + ulong packet_length= 0; // just to avoid (false positive) compiler warning +#else ulong packet_length; +#endif /* WITH_WSREP */ NET *net= &thd->net; enum enum_server_command command; DBUG_ENTER("do_command"); @@ -3386,6 +3390,7 @@ mysql_execute_command(THD *thd) /* select_create is currently not re-execution friendly and needs to be created for every execution of a PS/SP. + Note: In wsrep-patch, CTAS is handled like a regular transaction. */ if ((result= new (thd->mem_root) select_create(thd, create_table, &create_info, @@ -4721,6 +4726,7 @@ end_with_restore_list: case SQLCOM_REVOKE_ROLE: case SQLCOM_GRANT_ROLE: { + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) if (!(res= mysql_grant_role(thd, lex->users_list, lex->sql_command != SQLCOM_GRANT_ROLE))) my_ok(thd); @@ -4753,6 +4759,21 @@ end_with_restore_list: break; } +#ifdef WITH_WSREP + if (lex->type & ( + REFRESH_GRANT | + REFRESH_HOSTS | + REFRESH_DES_KEY_FILE | +#ifdef HAVE_QUERY_CACHE + REFRESH_QUERY_CACHE_FREE | +#endif /* HAVE_QUERY_CACHE */ + REFRESH_STATUS | + REFRESH_USER_RESOURCES)) + { + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL) + } +#endif /* WITH_WSREP*/ + /* reload_acl_and_cache() will tell us if we are allowed to write to the binlog or not. diff --git a/sql/sql_plugin_services.ic b/sql/sql_plugin_services.ic index 4bb1861281e..0570e2fcdd9 100644 --- a/sql/sql_plugin_services.ic +++ b/sql/sql_plugin_services.ic @@ -105,6 +105,7 @@ static struct wsrep_service_st wsrep_handler = { get_wsrep_certify_nonPK, get_wsrep_debug, get_wsrep_drupal_282555_workaround, + get_wsrep_recovery, get_wsrep_load_data_splitting, get_wsrep_log_conflicts, get_wsrep_protocol_version, diff --git a/sql/sql_table.cc b/sql/sql_table.cc index d359727fa47..512c5a355c5 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -8276,6 +8276,17 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name, DEBUG_SYNC(thd, "alter_opened_table"); +#ifdef WITH_WSREP + DBUG_EXECUTE_IF("sync.alter_opened_table", + { + const char act[]= + "now " + "wait_for signal.alter_opened_table"; + DBUG_ASSERT(!debug_sync_set_action(thd, + STRING_WITH_LEN(act))); + };); +#endif // WITH_WSREP + if (error) DBUG_RETURN(true); diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index cda8b5cc327..2cc87a6913f 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4823,8 +4823,10 @@ static Sys_var_uint Sys_wsrep_sync_wait( static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS }; static Sys_var_enum Sys_wsrep_OSU_method( "wsrep_OSU_method", "Method for Online Schema Upgrade", - GLOBAL_VAR(wsrep_OSU_method_options), CMD_LINE(OPT_ARG), - wsrep_OSU_method_names, DEFAULT(WSREP_OSU_TOI)); + SESSION_VAR(wsrep_OSU_method), CMD_LINE(OPT_ARG), + wsrep_OSU_method_names, DEFAULT(WSREP_OSU_TOI), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(0)); static PolyLock_mutex PLock_wsrep_desync(&LOCK_wsrep_desync); static Sys_var_mybool Sys_wsrep_desync ( diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index 789c6b19dd6..c1c6a90e614 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -1,4 +1,4 @@ -/* Copyright (C) 2013 Codership Oy <info@codership.com> +/* Copyright (C) 2013-2015 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -14,9 +14,10 @@ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "wsrep_priv.h" -#include "wsrep_binlog.h" +#include "wsrep_binlog.h" // wsrep_dump_rbr_buf() +#include "wsrep_xid.h" -#include "log_event.h" // EVENT_LEN_OFFSET, etc. +#include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc. #include "wsrep_applier.h" /* @@ -74,13 +75,10 @@ void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev) Format_description_log_event* wsrep_get_apply_format(THD* thd) { if (thd->wsrep_apply_format) - return (Format_description_log_event*) thd->wsrep_apply_format; - /* TODO: mariadb does not support rli->get_rli_description_event() - * => look for alternative way to remember last FDE in replication - */ - //return thd->wsrep_rli->get_rli_description_event(); - thd->wsrep_apply_format = new Format_description_log_event(4); - return (Format_description_log_event*) thd->wsrep_apply_format; + { + return (Format_description_log_event*) thd->wsrep_apply_format; + } + return thd->wsrep_rgi->rli->relay_log.description_event_for_exec; } static wsrep_cb_status_t wsrep_apply_events(THD* thd, @@ -90,6 +88,7 @@ static wsrep_cb_status_t wsrep_apply_events(THD* thd, char *buf= (char *)events_buf; int rcode= 0; int event= 1; + Log_event_type typ; DBUG_ENTER("wsrep_apply_events"); @@ -124,7 +123,9 @@ static wsrep_cb_status_t wsrep_apply_events(THD* thd, goto error; } - switch (ev->get_type_code()) { + typ= ev->get_type_code(); + + switch (typ) { case FORMAT_DESCRIPTION_EVENT: wsrep_set_apply_format(thd, (Format_description_log_event*)ev); continue; @@ -144,10 +145,11 @@ static wsrep_cb_status_t wsrep_apply_events(THD* thd, break; } - thd->set_server_id(ev->server_id); // use the original server id for logging - thd->set_time(); // time the query + /* Use the original server id for logging. */ + thd->set_server_id(ev->server_id); + thd->set_time(); // time the query wsrep_xid_init(&thd->transaction.xid_state.xid, - &thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.uuid, thd->wsrep_trx_meta.gtid.seqno); thd->lex->current_select= 0; if (!ev->when) @@ -157,8 +159,11 @@ static wsrep_cb_status_t wsrep_apply_events(THD* thd, ev->when_sec_part= hrtime_sec_part(hrtime); } + thd->variables.option_bits= + (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | + (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); + ev->thd = thd; - //exec_res = ev->apply_event(thd->wsrep_rli); exec_res = ev->apply_event(thd->wsrep_rgi); DBUG_PRINT("info", ("exec_event result: %d", exec_res)); @@ -190,7 +195,7 @@ static wsrep_cb_status_t wsrep_apply_events(THD* thd, DBUG_RETURN(WSREP_CB_FAILURE); } - delete ev; + delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); } error: @@ -288,14 +293,6 @@ static wsrep_cb_status_t wsrep_commit(THD* const thd, wsrep_cb_status_t const rcode(trans_commit(thd) ? WSREP_CB_FAILURE : WSREP_CB_SUCCESS); -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "committed %lld", (long long)wsrep_thd_trx_seqno(thd)); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "committed"); -#endif /* WSREP_PROC_INFO */ - if (WSREP_CB_SUCCESS == rcode) { thd->wsrep_rgi->cleanup_context(thd, false); @@ -305,6 +302,14 @@ static wsrep_cb_status_t wsrep_commit(THD* const thd, // TODO: mark snapshot with global_seqno. } +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "committed %lld", (long long) wsrep_thd_trx_seqno(thd)); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "committed"); +#endif /* WSREP_PROC_INFO */ + return rcode; } diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h index c3892e2dac4..b6497776e87 100644 --- a/sql/wsrep_applier.h +++ b/sql/wsrep_applier.h @@ -16,7 +16,8 @@ #ifndef WSREP_APPLIER_H #define WSREP_APPLIER_H -#include <sys/types.h> +#include <my_config.h> +#include "../wsrep/wsrep_api.h" void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev); Format_description_log_event* wsrep_get_apply_format(THD* thd); diff --git a/sql/wsrep_dummy.cc b/sql/wsrep_dummy.cc index 1a6f5128e9e..5cbf6821e7c 100644 --- a/sql/wsrep_dummy.cc +++ b/sql/wsrep_dummy.cc @@ -47,6 +47,9 @@ my_bool get_wsrep_drupal_282555_workaround() my_bool get_wsrep_load_data_splitting() { return 0; } +my_bool get_wsrep_recovery() +{ return 0; } + my_bool get_wsrep_log_conflicts() { return 0; } diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index 8ef51548e79..f45ba5e5a39 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -1,4 +1,4 @@ -/* Copyright 2008 Codership Oy <http://www.codership.com> +/* Copyright 2008-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -19,6 +19,7 @@ #include <sql_class.h> #include "wsrep_mysqld.h" #include "wsrep_binlog.h" +#include "wsrep_xid.h" #include <cstdio> #include <cstdlib> @@ -96,17 +97,40 @@ void wsrep_register_hton(THD* thd, bool all) */ void wsrep_post_commit(THD* thd, bool all) { - if (thd->wsrep_exec_mode == LOCAL_COMMIT) + /* + TODO: It can perhaps be fixed in a more elegant fashion by turning off + wsrep_emulate_binlog if wsrep_on=0 on server start. + https://github.com/codership/mysql-wsrep/issues/112 + */ + if (!WSREP_ON) + return; + + switch (thd->wsrep_exec_mode) { - DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); - if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle)) + case LOCAL_COMMIT: { + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); + if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle)) + { DBUG_PRINT("wsrep", ("set committed fail")); WSREP_WARN("set committed fail: %llu %d", (long long)thd->real_id, thd->get_stmt_da()->status()); + } + wsrep_cleanup_transaction(thd); + break; } - wsrep_cleanup_transaction(thd); + case LOCAL_STATE: + { + /* + Non-InnoDB statements may have populated events in stmt cache => cleanup + */ + WSREP_DEBUG("cleanup transaction for LOCAL_STATE: %s", thd->query()); + wsrep_cleanup_transaction(thd); + break; + } + default: break; } + } /* @@ -489,7 +513,7 @@ wsrep_run_wsrep_commit(THD *thd, bool all) if (thd->transaction.xid_state.xid.get_my_xid()) { wsrep_xid_init(&thd->transaction.xid_state.xid, - &thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.uuid, thd->wsrep_trx_meta.gtid.seqno); } DBUG_PRINT("wsrep", ("replicating commit success")); diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 10a138a1519..9f725977d06 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -1,4 +1,4 @@ -/* Copyright 2008-2014 Codership Oy <http://www.codership.com> +/* Copyright 2008-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -31,6 +31,7 @@ #include "wsrep_var.h" #include "wsrep_binlog.h" #include "wsrep_applier.h" +#include "wsrep_xid.h" #include <cstdio> #include <cstdlib> #include "log_event.h" @@ -248,62 +249,22 @@ static void wsrep_log_states (wsrep_log_level_t const level, wsrep_log_cb (level, msg); } -static my_bool set_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) -{ - XID* xid= reinterpret_cast<XID*>(arg); - handlerton* hton= plugin_data(plugin, handlerton *); - if (hton->set_checkpoint) - { - const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid)); - char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); - WSREP_DEBUG("Set WSREPXid for InnoDB: %s:%lld", - uuid_str, (long long)wsrep_xid_seqno(xid)); - hton->set_checkpoint(hton, xid); - } - return FALSE; -} - -void wsrep_set_SE_checkpoint(XID* xid) -{ - plugin_foreach(NULL, set_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); -} - -static my_bool get_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) -{ - XID* xid= reinterpret_cast<XID*>(arg); - handlerton* hton= plugin_data(plugin, handlerton *); - if (hton->get_checkpoint) - { - hton->get_checkpoint(hton, xid); - const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid)); - char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); - WSREP_DEBUG("Read WSREPXid from InnoDB: %s:%lld", - uuid_str, (long long)wsrep_xid_seqno(xid)); - } - return FALSE; -} - -void wsrep_get_SE_checkpoint(XID* xid) -{ - plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); -} - #ifdef GTID_SUPPORT -void wsrep_init_sidno(const wsrep_uuid_t& uuid) +void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid) { /* generate new Sid map entry from inverted uuid */ rpl_sid sid; wsrep_uuid_t ltid_uuid; + for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i) { - ltid_uuid.data[i] = ~local_uuid.data[i]; + ltid_uuid.data[i] = ~wsrep_uuid.data[i]; } + sid.copy_from(ltid_uuid.data); global_sid_lock->wrlock(); wsrep_sidno= global_sid_map->add_sid(sid); - WSREP_INFO("inited wsrep sidno %d", wsrep_sidno); + WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno); global_sid_lock->unlock(); } #endif /* GTID_SUPPORT */ @@ -444,13 +405,11 @@ wsrep_view_handler_cb (void* app_ctx, local_seqno= view->state_id.seqno; } /* Init storage engine XIDs from first view */ - XID xid; - wsrep_xid_init(&xid, &local_uuid, local_seqno); - wsrep_set_SE_checkpoint(&xid); - memb_status= WSREP_MEMBER_JOINED; + wsrep_set_SE_checkpoint(local_uuid, local_seqno); #ifdef GTID_SUPPORT wsrep_init_sidno(local_uuid); #endif /* GTID_SUPPORT */ + memb_status= WSREP_MEMBER_JOINED; } // just some sanity check @@ -560,38 +519,28 @@ static void wsrep_synced_cb(void* app_ctx) static void wsrep_init_position() { /* read XIDs from storage engines */ - XID xid; - memset(&xid, 0, sizeof(xid)); - xid.formatID= -1; - wsrep_get_SE_checkpoint(&xid); + wsrep_uuid_t uuid; + wsrep_seqno_t seqno; + wsrep_get_SE_checkpoint(uuid, seqno); - if (xid.formatID == -1) + if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t))) { WSREP_INFO("Read nil XID from storage engines, skipping position init"); return; } - else if (!wsrep_is_wsrep_xid(&xid)) - { - WSREP_WARN("Read non-wsrep XID from storage engines, skipping position init"); - return; - } - - const wsrep_uuid_t* uuid= wsrep_xid_uuid(&xid); - const wsrep_seqno_t seqno= wsrep_xid_seqno(&xid); char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); + wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)); WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno); - if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) && local_seqno == WSREP_SEQNO_UNDEFINED) { // Initial state - local_uuid= *uuid; + local_uuid= uuid; local_seqno= seqno; } - else if (memcmp(&local_uuid, uuid, sizeof(local_uuid)) || + else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) || local_seqno != seqno) { WSREP_WARN("Initial position was provided by configuration or SST, " @@ -861,6 +810,7 @@ void wsrep_deinit(bool free_options) provider_name[0]= '\0'; provider_version[0]= '\0'; provider_vendor[0]= '\0'; + wsrep_inited= 0; if (free_options) @@ -899,13 +849,11 @@ void wsrep_recover() uuid_str, (long long)local_seqno); return; } - XID xid; - memset(&xid, 0, sizeof(xid)); - xid.formatID= -1; - wsrep_get_SE_checkpoint(&xid); - wsrep_uuid_print(wsrep_xid_uuid(&xid), uuid_str, sizeof(uuid_str)); - WSREP_INFO("Recovered position: %s:%lld", uuid_str, - (long long)wsrep_xid_seqno(&xid)); + wsrep_uuid_t uuid; + wsrep_seqno_t seqno; + wsrep_get_SE_checkpoint(uuid, seqno); + wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str)); + WSREP_INFO("Recovered position: %s:%lld", uuid_str, (long long)seqno); } @@ -1271,6 +1219,11 @@ int wsrep_to_buf_helper( return 1; int ret(0); + Format_description_log_event *tmp_fd= new Format_description_log_event(4); + tmp_fd->checksum_alg= binlog_checksum_options; + tmp_fd->write(&tmp_io_cache); + delete tmp_fd; + #ifdef GTID_SUPPORT if (thd->variables.gtid_next.type == GTID_GROUP) { @@ -1400,6 +1353,12 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len) return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); } +/* + returns: + 0: statement was replicated as TOI + 1: TOI replication was skipped + -1: TOI replication failed + */ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, const TABLE_LIST* table_list) { @@ -1436,31 +1395,39 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, wsrep_key_arr_t key_arr= {0, 0}; struct wsrep_buf buff = { buf, buf_len }; - if (!buf_err && + if (!buf_err && wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& + key_arr.keys_len > 0 && WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, - key_arr.keys, key_arr.keys_len, - &buff, 1, - &thd->wsrep_trx_meta))) + key_arr.keys, key_arr.keys_len, + &buff, 1, + &thd->wsrep_trx_meta))) { thd->wsrep_exec_mode= TOTAL_ORDER; wsrep_to_isolation++; - my_free(buf); + if (buf) my_free(buf); wsrep_keys_free(&key_arr); WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd), - thd->wsrep_exec_mode); + thd->wsrep_exec_mode); } - else { + else if (key_arr.keys_len > 0) { /* jump to error handler in mysql_execute_command() */ WSREP_WARN("TO isolation failed for: %d, sql: %s. Check wsrep " "connection state and retry the query.", ret, (thd->query()) ? thd->query() : "void"); my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check " - "your wsrep connection state and retry the query."); + "your wsrep connection state and retry the query."); if (buf) my_free(buf); wsrep_keys_free(&key_arr); return -1; } + else { + /* non replicated DDL, affecting temporary tables only */ + WSREP_DEBUG("TO isolation skipped for: %d, sql: %s." + "Only temporary tables affected.", + ret, (thd->query()) ? thd->query() : "void"); + return 1; + } return 0; } @@ -1470,11 +1437,9 @@ static void wsrep_TOI_end(THD *thd) { WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void"); - - XID xid; - wsrep_xid_init(&xid, &thd->wsrep_trx_meta.gtid.uuid, - thd->wsrep_trx_meta.gtid.seqno); - wsrep_set_SE_checkpoint(&xid); + + wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.seqno); WSREP_DEBUG("TO END: %lld, update seqno", (long long)wsrep_thd_trx_seqno(thd)); @@ -1610,14 +1575,25 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) { - switch (wsrep_OSU_method_options) { + switch (thd->variables.wsrep_OSU_method) { case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_, table_list); break; case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break; + default: + WSREP_ERROR("Unsupported OSU method: %lu", + thd->variables.wsrep_OSU_method); + ret= -1; + break; } - if (!ret) - { - thd->wsrep_exec_mode= TOTAL_ORDER; + switch (ret) { + case 0: thd->wsrep_exec_mode= TOTAL_ORDER; break; + case 1: + /* TOI replication skipped, treat as success */ + ret = 0; + break; + case -1: + /* TOI replication failed, treat as error */ + break; } } return ret; @@ -1627,10 +1603,14 @@ void wsrep_to_isolation_end(THD *thd) { if (thd->wsrep_exec_mode == TOTAL_ORDER) { - switch(wsrep_OSU_method_options) + switch(thd->variables.wsrep_OSU_method) { case WSREP_OSU_TOI: wsrep_TOI_end(thd); break; case WSREP_OSU_RSU: wsrep_RSU_end(thd); break; + default: + WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu", + thd->variables.wsrep_OSU_method); + break; } wsrep_cleanup_transaction(thd); } @@ -1649,10 +1629,21 @@ void wsrep_to_isolation_end(THD *thd) gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->get_command(), gra->lex->sql_command, gra->query()); +/** + Check if request for the metadata lock should be granted to the requester. + + @param requestor_ctx The MDL context of the requestor + @param ticket MDL ticket for the requested lock + + @retval TRUE Lock request can be granted + @retval FALSE Lock request cannot be granted +*/ + bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, MDL_ticket *ticket ) { + /* Fallback to the non-wsrep behaviour */ if (!WSREP_ON) return FALSE; THD *request_thd = requestor_ctx->get_thd(); @@ -2474,9 +2465,13 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len) if (lex->definer) { /* SUID trigger. */ + LEX_USER *d= get_current_user(thd, lex->definer); + + if (!d) + return 1; - definer_user= lex->definer->user; - definer_host= lex->definer->host; + definer_user= d->user; + definer_host= d->host; } else { @@ -2527,6 +2522,11 @@ my_bool get_wsrep_drupal_282555_workaround() return wsrep_drupal_282555_workaround; } +my_bool get_wsrep_recovery() +{ + return wsrep_recovery; +} + my_bool get_wsrep_log_conflicts() { return wsrep_log_conflicts; diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index f68b89e7994..6d0fcd6e058 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -1,4 +1,4 @@ -/* Copyright 2008-2013 Codership Oy <http://www.codership.com> +/* Copyright 2008-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -77,9 +77,7 @@ extern ulong wsrep_max_ws_rows; extern const char* wsrep_notify_cmd; extern long wsrep_max_protocol_version; extern ulong wsrep_forced_binlog_format; -extern ulong wsrep_OSU_method_options; extern my_bool wsrep_desync; -extern my_bool wsrep_recovery; extern my_bool wsrep_replicate_myisam; extern ulong wsrep_mysql_replication_bundle; extern my_bool wsrep_restart_slave; @@ -91,7 +89,12 @@ extern bool wsrep_new_cluster; extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; -enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU }; +enum enum_wsrep_OSU_method { + WSREP_OSU_TOI, + WSREP_OSU_RSU, + WSREP_OSU_NONE, +}; + enum enum_wsrep_sync_wait { WSREP_SYNC_WAIT_NONE = 0x0, // show, select, begin @@ -283,14 +286,6 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); int wsrep_create_event_query(THD *thd, uchar** buf, size_t* buf_len); int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len); -struct xid_t; -void wsrep_get_SE_checkpoint(xid_t*); -void wsrep_set_SE_checkpoint(xid_t*); -void wsrep_init_sidno(const wsrep_uuid_t&); -void wsrep_xid_init(xid_t*, const wsrep_uuid_t*, wsrep_seqno_t); -const wsrep_uuid_t* wsrep_xid_uuid(const xid_t*); -wsrep_seqno_t wsrep_xid_seqno(const xid_t*); - extern bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, MDL_ticket *ticket); @@ -329,7 +324,6 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); #define wsrep_emulate_bin_log (0) #define wsrep_xid_seqno(X) (0) #define wsrep_to_isolation (0) -#define wsrep_recovery (0) #define wsrep_init() (1) #define wsrep_prepend_PATH(X) #define wsrep_before_SE() (0) diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h index 5c66587d757..30dce78c1a4 100644 --- a/sql/wsrep_priv.h +++ b/sql/wsrep_priv.h @@ -32,22 +32,20 @@ ssize_t wsrep_sst_prepare (void** msg); wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const void* msg, size_t msg_len, - const wsrep_gtid_t* current_id, + const wsrep_gtid_t* state_id, const char* state, size_t state_len, bool bypass); -extern unsigned int wsrep_check_ip (const char* addr); -extern size_t wsrep_guess_ip (char* buf, size_t buf_len); -extern size_t wsrep_guess_address(char* buf, size_t buf_len); extern wsrep_uuid_t local_uuid; extern wsrep_seqno_t local_seqno; // a helper function -extern void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t, - const void*, size_t); +void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t&, wsrep_seqno_t, + const void*, size_t); /*! SST thread signals init thread about sst completion */ -extern void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); +void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); void wsrep_notify_status (wsrep_member_status_t new_status, const wsrep_view_info_t* view = 0); + #endif /* WSREP_PRIV_H */ diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index d37c6c0e96c..7e6bc2123dc 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -1,4 +1,4 @@ -/* Copyright 2008-2012 Codership Oy <http://www.codership.com> +/* Copyright 2008-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,6 +26,7 @@ #include <sql_parse.h> #include "wsrep_priv.h" #include "wsrep_utils.h" +#include "wsrep_xid.h" #include <cstdio> #include <cstdlib> @@ -264,19 +265,41 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, } void wsrep_sst_received (wsrep_t* const wsrep, - const wsrep_uuid_t* const uuid, + const wsrep_uuid_t& uuid, wsrep_seqno_t const seqno, const void* const state, size_t const state_len) { - int const rcode(seqno < 0 ? seqno : 0); - wsrep_gtid_t const state_id = { - *uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno) - }; + wsrep_get_SE_checkpoint(local_uuid, local_seqno); + + if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) || + local_seqno < seqno || seqno < 0) + { + wsrep_set_SE_checkpoint(uuid, seqno); + local_uuid = uuid; + local_seqno = seqno; + } + else if (local_seqno > seqno) + { + WSREP_WARN("SST postion is in the past: %lld, current: %lld. " + "Can't continue.", + (long long)seqno, (long long)local_seqno); + unireg_abort(1); + } + #ifdef GTID_SUPPORT - wsrep_init_sidno(state_id.uuid); + wsrep_init_sidno(uuid); #endif /* GTID_SUPPORT */ - wsrep->sst_received(wsrep, &state_id, state, state_len, rcode); + + if (wsrep) + { + int const rcode(seqno < 0 ? seqno : 0); + wsrep_gtid_t const state_id = { + uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno) + }; + + wsrep->sst_received(wsrep, &state_id, state, state_len, rcode); + } } // Let applier threads to continue @@ -285,7 +308,7 @@ void wsrep_sst_continue () if (sst_needed) { WSREP_INFO("Signalling provider to continue."); - wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + wsrep_sst_received (wsrep, local_uuid, local_seqno, NULL, 0); } } @@ -781,7 +804,7 @@ static int sst_donate_mysqldump (const char* addr, host_len = strlen (addr) + 1; } - char *host=(char*)alloca(host_len); + char *host= (char *) alloca(host_len); strncpy (host, addr, host_len - 1); host[host_len - 1] = '\0'; @@ -801,7 +824,7 @@ static int sst_donate_mysqldump (const char* addr, user_len = (auth) ? strlen (auth) + 1 : 1; } - char *user=(char*)alloca(user_len); + char *user= (char *) alloca(user_len); strncpy (user, (auth) ? auth : "", user_len - 1); user[user_len - 1] = '\0'; @@ -912,11 +935,13 @@ static int sst_flush_tables(THD* thd) { WSREP_INFO("Tables flushed."); const char base_name[]= "tables_flushed"; + ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2; - char *real_name=(char*)alloca(full_len); - sprintf(real_name, "%s/%s", mysql_real_data_home, base_name); - char *tmp_name=(char*)alloca(full_len + 4); - sprintf(tmp_name, "%s.tmp", real_name); + char *real_name= (char *) alloca(full_len); + snprintf(real_name, (size_t) full_len, "%s/%s", mysql_real_data_home, + base_name); + char *tmp_name= (char *) alloca(full_len + 4); + snprintf(tmp_name, (size_t) full_len + 4, "%s.tmp", real_name); FILE* file= fopen(tmp_name, "w+"); if (0 == file) @@ -1078,7 +1103,7 @@ static int sst_donate_other (const char* method, wsrep_seqno_t seqno, bool bypass) { - char cmd_str[4096]; + char cmd_str[4096]; const char* binlog_opt= ""; char* binlog_opt_val= NULL; @@ -1111,7 +1136,7 @@ static int sst_donate_other (const char* method, bypass ? " "WSREP_SST_OPT_BYPASS : ""); my_free(binlog_opt_val); - if (ret < 0 || ret >= (int)sizeof(cmd_str)) + if (ret < 0 || ret >= (int) sizeof(cmd_str)) { WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret); return (ret < 0 ? ret : -EMSGSIZE); diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 2baee3a252d..9e608f94848 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -122,10 +122,13 @@ static rpl_group_info* wsrep_relay_group_init(const char* log_fname) */ rli->mi = new Master_info(&connection_name, false); - rli->sql_driver_thd= current_thd; - struct rpl_group_info *rgi= new rpl_group_info(rli); - rgi->thd= current_thd; + rgi->thd= rli->sql_driver_thd= current_thd; + + if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) + { + rgi->deferred_events= new Deferred_log_events(rli); + } return rgi; } @@ -173,6 +176,8 @@ static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) delete thd->system_thread_info.rpl_sql_info; delete thd->wsrep_rgi->rli->mi; delete thd->wsrep_rgi->rli; + + thd->wsrep_rgi->cleanup_after_session(); delete thd->wsrep_rgi; thd->wsrep_rgi = NULL; } @@ -567,10 +572,21 @@ int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) DBUG_ENTER("wsrep_abort_thd"); if ( (WSREP(bf_thd) || - ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) && - bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && + ( (WSREP_ON || bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU) && + bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && victim_thd) { + if ((victim_thd->wsrep_conflict_state == MUST_ABORT) || + (victim_thd->wsrep_conflict_state == ABORTED) || + (victim_thd->wsrep_conflict_state == ABORTING)) + { + WSREP_DEBUG("wsrep_abort_thd called by %llu with victim %llu already " + "aborted. Ignoring.", + (bf_thd) ? (long long)bf_thd->real_id : 0, + (long long)victim_thd->real_id); + DBUG_RETURN(1); + } + WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); ha_abort_transaction(bf_thd, victim_thd, signal); diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index 7172f77bfed..7a87d38d430 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -1,4 +1,4 @@ -/* Copyright 2010 Codership Oy <http://www.codership.com> +/* Copyright 2010-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -150,7 +150,35 @@ process::process (const char* cmd, const char* type) goto cleanup_pipe; } - err_ = posix_spawnattr_setflags (&attr, POSIX_SPAWN_SETSIGDEF | + /* make sure that no signlas are masked in child process */ + sigset_t sigmask_empty; sigemptyset(&sigmask_empty); + err_ = posix_spawnattr_setsigmask(&attr, &sigmask_empty); + if (err_) + { + WSREP_ERROR ("posix_spawnattr_setsigmask() failed: %d (%s)", + err_, strerror(err_)); + goto cleanup_attr; + } + + /* make sure the following signals are not ignored in child process */ + sigset_t default_signals; sigemptyset(&default_signals); + sigaddset(&default_signals, SIGHUP); + sigaddset(&default_signals, SIGINT); + sigaddset(&default_signals, SIGQUIT); + sigaddset(&default_signals, SIGPIPE); + sigaddset(&default_signals, SIGTERM); + sigaddset(&default_signals, SIGCHLD); + err_ = posix_spawnattr_setsigdefault(&attr, &default_signals); + if (err_) + { + WSREP_ERROR ("posix_spawnattr_setsigdefault() failed: %d (%s)", + err_, strerror(err_)); + goto cleanup_attr; + } + + err_ = posix_spawnattr_setflags (&attr, POSIX_SPAWN_SETSIGDEF | + POSIX_SPAWN_SETSIGMASK | + /* start a new process group */ POSIX_SPAWN_SETPGROUP | POSIX_SPAWN_USEVFORK); if (err_) { @@ -439,57 +467,3 @@ size_t wsrep_guess_ip (char* buf, size_t buf_len) return 0; } - -/* - * WSREPXid - */ - -#define WSREP_XID_PREFIX "WSREPXid" -#define WSREP_XID_PREFIX_LEN MYSQL_XID_PREFIX_LEN -#define WSREP_XID_UUID_OFFSET 8 -#define WSREP_XID_SEQNO_OFFSET (WSREP_XID_UUID_OFFSET + sizeof(wsrep_uuid_t)) -#define WSREP_XID_GTRID_LEN (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) - -void wsrep_xid_init(XID* xid, const wsrep_uuid_t* uuid, wsrep_seqno_t seqno) -{ - xid->formatID= 1; - xid->gtrid_length= WSREP_XID_GTRID_LEN; - xid->bqual_length= 0; - memset(xid->data, 0, sizeof(xid->data)); - memcpy(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN); - memcpy(xid->data + WSREP_XID_UUID_OFFSET, uuid, sizeof(wsrep_uuid_t)); - memcpy(xid->data + WSREP_XID_SEQNO_OFFSET, &seqno, sizeof(wsrep_seqno_t)); -} - -const wsrep_uuid_t* wsrep_xid_uuid(const XID* xid) -{ - if (wsrep_is_wsrep_xid(xid)) - return reinterpret_cast<const wsrep_uuid_t*>(xid->data - + WSREP_XID_UUID_OFFSET); - else - return &WSREP_UUID_UNDEFINED; -} - -wsrep_seqno_t wsrep_xid_seqno(const XID* xid) -{ - - if (wsrep_is_wsrep_xid(xid)) - { - wsrep_seqno_t seqno; - memcpy(&seqno, xid->data + WSREP_XID_SEQNO_OFFSET, sizeof(wsrep_seqno_t)); - return seqno; - } - else - { - return WSREP_SEQNO_UNDEFINED; - } -} - -extern -int wsrep_is_wsrep_xid(const XID* xid) -{ - return (xid->formatID == 1 && - xid->gtrid_length == WSREP_XID_GTRID_LEN && - xid->bqual_length == 0 && - !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN)); -} diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index f04d84adf4f..d6aa1ca5c79 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -1,4 +1,4 @@ -/* Copyright 2008 Codership Oy <http://www.codership.com> +/* Copyright 2008-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,6 +21,7 @@ #include <sql_acl.h> #include "wsrep_priv.h" #include "wsrep_thd.h" +#include "wsrep_xid.h" #include <my_dir.h> #include <cstdio> #include <cstdlib> @@ -33,7 +34,6 @@ const char* wsrep_node_name = 0; const char* wsrep_node_address = 0; const char* wsrep_node_incoming_address = 0; const char* wsrep_start_position = 0; -ulong wsrep_OSU_method_options; int wsrep_init_vars() { @@ -128,29 +128,30 @@ err: return 1; } -void wsrep_set_local_position (const char* value) +static +void wsrep_set_local_position(const char* const value, bool const sst) { - size_t value_len = strlen (value); - size_t uuid_len = wsrep_uuid_scan (value, value_len, &local_uuid); + size_t const value_len = strlen(value); + wsrep_uuid_t uuid; + size_t const uuid_len = wsrep_uuid_scan(value, value_len, &uuid); + wsrep_seqno_t const seqno = strtoll(value + uuid_len + 1, NULL, 10); - local_seqno = strtoll (value + uuid_len + 1, NULL, 10); - - XID xid; - wsrep_xid_init(&xid, &local_uuid, local_seqno); - wsrep_set_SE_checkpoint(&xid); - WSREP_INFO ("wsrep_start_position var submitted: '%s'", wsrep_start_position); + if (sst) { + wsrep_sst_received (wsrep, uuid, seqno, NULL, 0); + } else { + // initialization + local_uuid = uuid; + local_seqno = seqno; + } } bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type) { + WSREP_INFO ("wsrep_start_position var submitted: '%s'", + wsrep_start_position); // since this value passed wsrep_start_position_check, don't check anything // here - wsrep_set_local_position (wsrep_start_position); - - if (wsrep) { - wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); - } - + wsrep_set_local_position (wsrep_start_position, true); return 0; } @@ -163,7 +164,7 @@ void wsrep_start_position_init (const char* val) return; } - wsrep_set_local_position (val); + wsrep_set_local_position (val, false); } static bool refresh_provider_options() @@ -200,7 +201,7 @@ static int wsrep_provider_verify (const char* provider_str) return 1; /* check that provider file exists */ - bzero(&f_stat, sizeof(MY_STAT)); + memset(&f_stat, 0, sizeof(MY_STAT)); if (!my_stat(path, &f_stat, MYF(0))) { return 1; diff --git a/sql/wsrep_xid.cc b/sql/wsrep_xid.cc new file mode 100644 index 00000000000..133e9cba825 --- /dev/null +++ b/sql/wsrep_xid.cc @@ -0,0 +1,147 @@ +/* Copyright 2015 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +//! @file some utility functions and classes not directly related to replication + +#include "wsrep_xid.h" +#include "sql_class.h" +#include "wsrep_mysqld.h" // for logging macros + +/* + * WSREPXid + */ + +#define WSREP_XID_PREFIX "WSREPXid" +#define WSREP_XID_PREFIX_LEN MYSQL_XID_PREFIX_LEN +#define WSREP_XID_UUID_OFFSET 8 +#define WSREP_XID_SEQNO_OFFSET (WSREP_XID_UUID_OFFSET + sizeof(wsrep_uuid_t)) +#define WSREP_XID_GTRID_LEN (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) + +void wsrep_xid_init(XID* xid, const wsrep_uuid_t& uuid, wsrep_seqno_t seqno) +{ + xid->formatID= 1; + xid->gtrid_length= WSREP_XID_GTRID_LEN; + xid->bqual_length= 0; + memset(xid->data, 0, sizeof(xid->data)); + memcpy(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN); + memcpy(xid->data + WSREP_XID_UUID_OFFSET, &uuid, sizeof(wsrep_uuid_t)); + memcpy(xid->data + WSREP_XID_SEQNO_OFFSET, &seqno, sizeof(wsrep_seqno_t)); +} + +int wsrep_is_wsrep_xid(const XID* xid) +{ + return (xid->formatID == 1 && + xid->gtrid_length == WSREP_XID_GTRID_LEN && + xid->bqual_length == 0 && + !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN)); +} + +const wsrep_uuid_t* wsrep_xid_uuid(const XID& xid) +{ + if (wsrep_is_wsrep_xid(&xid)) + return reinterpret_cast<const wsrep_uuid_t*>(xid.data + + WSREP_XID_UUID_OFFSET); + else + return &WSREP_UUID_UNDEFINED; +} + +wsrep_seqno_t wsrep_xid_seqno(const XID& xid) +{ + if (wsrep_is_wsrep_xid(&xid)) + { + wsrep_seqno_t seqno; + memcpy(&seqno, xid.data + WSREP_XID_SEQNO_OFFSET, sizeof(wsrep_seqno_t)); + return seqno; + } + else + { + return WSREP_SEQNO_UNDEFINED; + } +} + +static my_bool set_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) +{ + XID* xid= static_cast<XID*>(arg); + handlerton* hton= plugin_data(plugin, handlerton *); + + if (hton->set_checkpoint) + { + const wsrep_uuid_t* uuid(wsrep_xid_uuid(*xid)); + char uuid_str[40] = {0, }; + wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); + WSREP_DEBUG("Set WSREPXid for InnoDB: %s:%lld", + uuid_str, (long long)wsrep_xid_seqno(*xid)); + hton->set_checkpoint(hton, xid); + } + return FALSE; +} + +void wsrep_set_SE_checkpoint(XID& xid) +{ + plugin_foreach(NULL, set_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, &xid); +} + +void wsrep_set_SE_checkpoint(const wsrep_uuid_t& uuid, wsrep_seqno_t seqno) +{ + XID xid; + wsrep_xid_init(&xid, uuid, seqno); + wsrep_set_SE_checkpoint(xid); +} + +static my_bool get_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) +{ + XID* xid= reinterpret_cast<XID*>(arg); + handlerton* hton= plugin_data(plugin, handlerton *); + + if (hton->get_checkpoint) + { + hton->get_checkpoint(hton, xid); + const wsrep_uuid_t* uuid(wsrep_xid_uuid(*xid)); + char uuid_str[40] = {0, }; + wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); + WSREP_DEBUG("Read WSREPXid from InnoDB: %s:%lld", + uuid_str, (long long)wsrep_xid_seqno(*xid)); + } + return FALSE; +} + +void wsrep_get_SE_checkpoint(XID& xid) +{ + plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, &xid); +} + +void wsrep_get_SE_checkpoint(wsrep_uuid_t& uuid, wsrep_seqno_t& seqno) +{ + uuid= WSREP_UUID_UNDEFINED; + seqno= WSREP_SEQNO_UNDEFINED; + + XID xid; + memset(&xid, 0, sizeof(xid)); + xid.formatID= -1; + + wsrep_get_SE_checkpoint(xid); + + if (xid.formatID == -1) return; // nil XID + + if (!wsrep_is_wsrep_xid(&xid)) + { + WSREP_WARN("Read non-wsrep XID from storage engines."); + return; + } + + uuid= *wsrep_xid_uuid(xid); + seqno= wsrep_xid_seqno(xid); +} diff --git a/sql/wsrep_xid.h b/sql/wsrep_xid.h new file mode 100644 index 00000000000..7bd2b063b48 --- /dev/null +++ b/sql/wsrep_xid.h @@ -0,0 +1,35 @@ +/* Copyright (C) 2015 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_XID_H +#define WSREP_XID_H + +#include <my_config.h> +#include "../wsrep/wsrep_api.h" +#include "handler.h" // XID typedef + +#ifdef WITH_WSREP + +void wsrep_xid_init(xid_t*, const wsrep_uuid_t&, wsrep_seqno_t); +const wsrep_uuid_t* wsrep_xid_uuid(const XID&); +wsrep_seqno_t wsrep_xid_seqno(const XID&); + +//void wsrep_get_SE_checkpoint(XID&); /* uncomment if needed */ +void wsrep_get_SE_checkpoint(wsrep_uuid_t&, wsrep_seqno_t&); +//void wsrep_set_SE_checkpoint(XID&); /* uncomment if needed */ +void wsrep_set_SE_checkpoint(const wsrep_uuid_t&, wsrep_seqno_t); + +#endif /* WITH_WSREP */ +#endif /* WSREP_UTILS_H */ |