diff options
author | Seppo Jaakola <seppo.jaakola@codership.com> | 2013-11-06 00:29:37 +0200 |
---|---|---|
committer | Seppo Jaakola <seppo.jaakola@codership.com> | 2013-11-06 00:29:37 +0200 |
commit | 2b4183f10b54a5b3f8c848d897b3107859c23fa4 (patch) | |
tree | 5d48ff3f0a9814926ba59b7adae1d056c57b54c8 | |
parent | 9129c8f1d3e1f8c9daeae559eaf6b9807b4331ec (diff) | |
download | mariadb-git-2b4183f10b54a5b3f8c848d897b3107859c23fa4.tar.gz |
bzr merge -r3890..3891 lp:codership-mysql/5.5
-rw-r--r-- | sql/CMakeLists.txt | 1 | ||||
-rw-r--r-- | sql/mdl.cc | 1 | ||||
-rw-r--r-- | sql/mysqld.cc | 76 | ||||
-rw-r--r-- | sql/mysqld.h | 8 | ||||
-rw-r--r-- | sql/sql_base.cc | 2 | ||||
-rw-r--r-- | sql/sql_parse.cc | 691 | ||||
-rw-r--r-- | sql/sys_vars.cc | 6 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 14 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 102 | ||||
-rw-r--r-- | sql/wsrep_notify.cc | 3 | ||||
-rw-r--r-- | sql/wsrep_notify.cc.moved | 107 | ||||
-rw-r--r-- | sql/wsrep_priv.h | 206 | ||||
-rw-r--r-- | sql/wsrep_priv.h.moved | 233 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 4 | ||||
-rw-r--r-- | sql/wsrep_sst.h | 40 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 424 | ||||
-rw-r--r-- | sql/wsrep_thd.h | 31 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 4 | ||||
-rw-r--r-- | sql/wsrep_utils.h | 208 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 7 | ||||
-rw-r--r-- | sql/wsrep_var.h | 81 |
21 files changed, 1202 insertions, 1047 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 0c17b851f5a..7766b82adff 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -50,6 +50,7 @@ IF(WITH_WSREP) wsrep_sst.cc wsrep_utils.cc wsrep_var.cc + wsrep_thd.cc ) SET(WSREP_LIB wsrep) ENDIF() diff --git a/sql/mdl.cc b/sql/mdl.cc index 7fbbfa74537..a13aeb7904d 100644 --- a/sql/mdl.cc +++ b/sql/mdl.cc @@ -24,6 +24,7 @@ #include <mysql/psi/mysql_stage.h> #ifdef WITH_WSREP #include "wsrep_mysqld.h" +#include "wsrep_thd.h" extern "C" my_thread_id wsrep_thd_thread_id(THD *thd); extern "C" char *wsrep_thd_query(THD *thd); void sql_print_information(const char *format, ...) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index c4f78ebd36f..1c9560ceb9f 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -73,6 +73,8 @@ #include "debug_sync.h" #ifdef WITH_WSREP #include "wsrep_mysqld.h" +#include "wsrep_thd.h" +#include "wsrep_sst.h" ulong wsrep_running_threads = 0; // # of currently running wsrep threads #endif #include "sql_callback.h" @@ -741,7 +743,7 @@ mysql_mutex_t LOCK_wsrep_slave_threads; mysql_mutex_t LOCK_wsrep_desync; int wsrep_replaying= 0; static void wsrep_close_threads(THD* thd); -#endif +#endif /* WITH_WSREP */ /* replication parameters, if master_host is not NULL, we are a slave */ uint report_port= 0; @@ -4467,26 +4469,26 @@ static int init_thread_environment() rpl_init_gtid_slave_state(); #endif -#ifdef WITH_WSREP - mysql_mutex_init(key_LOCK_wsrep_ready, - &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); +#ifdef WITH_WSREP + mysql_mutex_init(key_LOCK_wsrep_ready, + &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL); - mysql_mutex_init(key_LOCK_wsrep_sst, - &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_sst, + &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL); - mysql_mutex_init(key_LOCK_wsrep_sst_init, - &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_sst_init, + &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL); - mysql_mutex_init(key_LOCK_wsrep_rollback, - &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_rollback, + &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL); - mysql_mutex_init(key_LOCK_wsrep_replaying, - &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_replaying, + &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); - mysql_mutex_init(key_LOCK_wsrep_slave_threads, - &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); - mysql_mutex_init(key_LOCK_wsrep_desync, - &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_slave_threads, + &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_desync, + &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); #endif DBUG_RETURN(0); @@ -5109,7 +5111,7 @@ pthread_handler_t start_wsrep_THD(void *arg) THD *thd; wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg; - if (my_thread_init()) + if (my_thread_init()) { WSREP_ERROR("Could not initialize thread"); return(NULL); @@ -5172,7 +5174,7 @@ pthread_handler_t start_wsrep_THD(void *arg) statistic_increment(aborted_connects,&LOCK_status); MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); delete thd; - + return(NULL); } @@ -5230,42 +5232,6 @@ pthread_handler_t start_wsrep_THD(void *arg) return(NULL); } -void wsrep_create_rollbacker() -{ - if (wsrep_provider && strcasecmp(wsrep_provider, "none")) - { - pthread_t hThread; - /* create rollbacker */ - if (pthread_create( &hThread, &connection_attrib, - start_wsrep_THD, (void*)wsrep_rollback_process)) - WSREP_WARN("Can't create thread to manage wsrep rollback"); - } -} - -void wsrep_create_appliers(long threads) -{ - if (!wsrep_connected) - { - /* see wsrep_replication_start() for the logic */ - if (wsrep_cluster_address && strlen(wsrep_cluster_address) && - wsrep_provider && strcasecmp(wsrep_provider, "none")) - { - WSREP_ERROR("Trying to launch slave threads before creating " - "connection at '%s'", wsrep_cluster_address); - assert(0); - } - return; - } - - long wsrep_threads=0; - pthread_t hThread; - while (wsrep_threads++ < threads) { - if (pthread_create( - &hThread, &connection_attrib, - start_wsrep_THD, (void*)wsrep_replication_process)) - WSREP_WARN("Can't create thread to manage wsrep replication"); - } -} /**/ static bool abort_replicated(THD *thd) { @@ -5290,7 +5256,7 @@ WSREP_WARN("applier has wsrep_exec_mode = %d", thd->wsrep_exec_mode); if ( thd->slave_thread || /* declared as mysql slave */ thd->system_thread || /* declared as system thread */ - !thd->vio_ok() || /* server internal thread */ + !thd->vio_ok() || /* server internal thread */ thd->wsrep_exec_mode==REPL_RECV || /* applier or replaying thread */ thd->wsrep_applier || /* wsrep slave applier */ !thd->variables.wsrep_on) /* client, but fenced outside wsrep */ diff --git a/sql/mysqld.h b/sql/mysqld.h index 522a7820533..c9cfb8d1094 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -240,7 +240,7 @@ extern PSI_mutex_key key_PAGE_lock, key_LOCK_sync, key_LOCK_active, #ifdef WITH_WSREP extern PSI_mutex_key key_LOCK_wsrep_thd; extern PSI_cond_key key_COND_wsrep_thd; -#endif /* HAVE_MMAP */ +#endif /* HAVE_WSREP */ #ifdef HAVE_OPENSSL extern PSI_mutex_key key_LOCK_des_key_file; @@ -580,8 +580,8 @@ enum options_mysqld OPT_WSREP_START_POSITION, OPT_WSREP_SST_AUTH, OPT_WSREP_RECOVER, -#endif OPT_which_is_always_the_last +#endif /* WITH_WSREP */ }; #endif @@ -724,5 +724,9 @@ extern uint internal_tmp_table_max_key_segments; extern uint volatile global_disable_checkpoint; extern my_bool opt_help; +#ifdef WITH_WSREP +#include "my_pthread.h" +pthread_handler_t start_wsrep_THD(void*); +#endif /* WITH_WSREP */ #endif /* MYSQLD_INCLUDED */ diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 325c79043a3..31a51fefc00 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -63,7 +63,7 @@ #ifdef WITH_WSREP #include "wsrep_mysqld.h" - +#include "wsrep_thd.h" #endif // WITH_WSREP bool diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 750613326d2..8f208ee66b1 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -105,7 +105,7 @@ #ifdef WITH_WSREP #include "wsrep_mysqld.h" -static void wsrep_client_rollback(THD *thd); +#include "wsrep_thd.h" static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, Parser_state *parser_state); #endif /* WITH_WSREP */ @@ -6571,90 +6571,6 @@ void mysql_init_multi_delete(LEX *lex) } #ifdef WITH_WSREP -static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*); -static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*); -void wsrep_replay_transaction(THD *thd) -{ - /* checking if BF trx must be replayed */ - if (thd->wsrep_conflict_state== MUST_REPLAY) - { - if (thd->wsrep_exec_mode!= REPL_RECV) - { - if (thd->get_stmt_da()->is_sent()) - { - WSREP_ERROR("replay issue, thd has reported status already"); - } - thd->get_stmt_da()->reset_diagnostics_area(); - - thd->wsrep_conflict_state= REPLAYING; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - mysql_reset_thd_for_next_command(thd, opt_userstat_running); - thd->killed= NOT_KILLED; - close_thread_tables(thd); - if (thd->locked_tables_mode && thd->lock) - { - WSREP_DEBUG("releasing table lock for replaying (%ld)", - thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); - } - thd->mdl_context.release_transactional_locks(); - - thd_proc_info(thd, "wsrep replaying trx"); - WSREP_DEBUG("replay trx: %s %lld", - thd->query() ? thd->query() : "void", - (long long)wsrep_thd_trx_seqno(thd)); - struct wsrep_thd_shadow shadow; - wsrep_prepare_bf_thd(thd, &shadow); - int rcode = wsrep->replay_trx(wsrep, - &thd->wsrep_ws_handle, - (void *)thd); - - wsrep_return_from_bf_mode(thd, &shadow); - if (thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - - switch (rcode) - { - case WSREP_OK: - thd->wsrep_conflict_state= NO_CONFLICT; - wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); - WSREP_DEBUG("trx_replay successful for: %ld %llu", - thd->thread_id, (long long)thd->real_id); - break; - case WSREP_TRX_FAIL: - if (thd->stmt_da->is_sent) - { - WSREP_ERROR("replay failed, thd has reported status"); - } - else - { - WSREP_DEBUG("replay failed, rolling back"); - my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); - } - thd->wsrep_conflict_state= ABORTED; - wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle); - break; - default: - WSREP_ERROR("trx_replay failed for: %d, query: %s", - rcode, thd->query() ? thd->query() : "void"); - /* we're now in inconsistent state, must abort */ - unireg_abort(1); - break; - } - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - WSREP_DEBUG("replaying decreased: %d, thd: %lu", - wsrep_replaying, thd->thread_id); - mysql_cond_broadcast(&COND_wsrep_replaying); - mysql_mutex_unlock(&LOCK_wsrep_replaying); - } - } -} - static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, Parser_state *parser_state) { @@ -8597,611 +8513,6 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name) return definer; } -#ifdef WITH_WSREP -/* must have (&thd->LOCK_wsrep_thd) */ -static void wsrep_client_rollback(THD *thd) -{ - WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s", - thd->thread_id, thd->query()); - - thd->wsrep_conflict_state= ABORTING; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - trans_rollback(thd); - - if (thd->locked_tables_mode && thd->lock) - { - WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); - } - - if (thd->global_read_lock.is_acquired()) - { - WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id); - thd->global_read_lock.unlock_global_read_lock(thd); - } - - /* Release transactional metadata locks. */ - thd->mdl_context.release_transactional_locks(); - - /* release explicit MDL locks */ - thd->mdl_context.release_explicit_locks(); - - if (thd->get_binlog_table_maps()) - { - WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id); - thd->clear_binlog_table_maps(); - } - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_conflict_state= ABORTED; - thd->wsrep_bf_thd = NULL; -} - -static enum wsrep_status wsrep_apply_sql( - THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed) -{ - int error; - enum wsrep_status ret_code= WSREP_OK; - - DBUG_ENTER("wsrep_bf_execute_cb"); - thd->wsrep_exec_mode= REPL_RECV; - thd->net.vio= 0; - thd->start_time= timeval; - thd->wsrep_rand= randseed; - - thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT; - - DBUG_PRINT("wsrep", ("SQL: %s", sql)); - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_query_state= QUERY_EXEC; - /* preserve replaying mode */ - if (thd->wsrep_conflict_state!= REPLAYING) - thd->wsrep_conflict_state= NO_CONFLICT; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) { - WSREP_WARN("BF SQL apply failed: %d, %lld", - thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno); - DBUG_RETURN(WSREP_FATAL); - } - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - if (thd->wsrep_conflict_state!= NO_CONFLICT && - thd->wsrep_conflict_state!= REPLAYING) { - ret_code= WSREP_FATAL; - WSREP_DEBUG("BF thd ending, with: %d, %lld", - thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno); - } - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - assert(thd->wsrep_exec_mode== REPL_RECV); - DBUG_RETURN(ret_code); -} - -void wsrep_write_rbr_buf( - THD *thd, const void* rbr_buf, size_t buf_len) -{ - char filename[PATH_MAX]= {0}; - int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log", - wsrep_data_home_dir, thd->thread_id, - (long long)thd->wsrep_trx_seqno); - if (len >= PATH_MAX) - { - WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len); - return; - } - - FILE *of= fopen(filename, "wb"); - if (of) - { - fwrite (rbr_buf, buf_len, 1, of); - fclose(of); - } - else - { - WSREP_ERROR("Failed to open file '%s': %d (%s)", - filename, errno, strerror(errno)); - } -} - -static inline wsrep_status_t wsrep_apply_rbr( - THD *thd, const uchar *rbr_buf, size_t buf_len) -{ - char *buf= (char *)rbr_buf; - int rcode= 0; - int event= 1; - Format_description_log_event *description_event = wsrep_format_desc; - DBUG_ENTER("wsrep_apply_rbr"); - - if (thd->killed == KILL_CONNECTION) - { - WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld", - (long long) thd->wsrep_trx_seqno); - DBUG_RETURN(WSREP_FATAL); - } - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_query_state= QUERY_EXEC; - if (thd->wsrep_conflict_state!= REPLAYING) - thd->wsrep_conflict_state= NO_CONFLICT; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", - (long long) thd->wsrep_trx_seqno); - - if ((rcode= trans_begin(thd))) - WSREP_WARN("begin for rbr apply failed: %lld, code: %d", - (long long) thd->wsrep_trx_seqno, rcode); - - while(buf_len) - { - int exec_res; - int error = 0; - Log_event* ev= wsrep_read_log_event(&buf, &buf_len, description_event); - - if (!ev) - { - WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld", - (long long)thd->wsrep_trx_seqno, buf_len); - rcode= 1; - goto error; - } - switch (ev->get_type_code()) { - case WRITE_ROWS_EVENT: - case UPDATE_ROWS_EVENT: - case DELETE_ROWS_EVENT: - DBUG_ASSERT(buf_len != 0 || - ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)); - break; - case FORMAT_DESCRIPTION_EVENT: - description_event = (Format_description_log_event *)ev; - break; - default: - break; - } - - thd->variables.server_id = ev->server_id; // use the original server id for logging - thd->set_time(); // time the query - wsrep_xid_init(&thd->transaction.xid_state.xid, - wsrep_cluster_uuid(), - thd->wsrep_trx_seqno); - thd->lex->current_select= 0; - if (!ev->when) - ev->when = time(NULL); - ev->thd = thd; - exec_res = ev->apply_event(thd->wsrep_rli); - DBUG_PRINT("info", ("exec_event result: %d", exec_res)); - - if (exec_res) - { - WSREP_WARN("RBR event %d %s apply warning: %d, %lld", - event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno); - rcode= exec_res; - /* stop processing for the first error */ - delete ev; - goto error; - } - event++; - - if (thd->wsrep_conflict_state!= NO_CONFLICT && - thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("conflict state after RBR event applying: %d, %lld", - thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno); - - if (thd->wsrep_conflict_state == MUST_ABORT) { - WSREP_WARN("RBR event apply failed, rolling back: %lld", - (long long) thd->wsrep_trx_seqno); - trans_rollback(thd); - thd->locked_tables_list.unlock_locked_tables(thd); - /* Release transactional metadata locks. */ - thd->mdl_context.release_transactional_locks(); - thd->wsrep_conflict_state= NO_CONFLICT; - DBUG_RETURN(WSREP_FATAL); - } - - if ((ev->get_type_code() == WRITE_ROWS_EVENT || - ev->get_type_code() == UPDATE_ROWS_EVENT || - ev->get_type_code() == DELETE_ROWS_EVENT) && - ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)) - { - thd->wsrep_rli->cleanup_context(thd, 0); - - if (error == 0) - { - thd->clear_error(); - } - else - WSREP_ERROR("Error in %s event: commit of row events failed: %lld", - ev->get_type_str(), (long long)thd->wsrep_trx_seqno); - } - - if (description_event != ev) - delete ev; - } - - error: - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_query_state= QUERY_IDLE; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - assert(thd->wsrep_exec_mode== REPL_RECV); - - if (thd->killed == KILL_CONNECTION) - WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno); - - if (rcode) DBUG_RETURN(WSREP_FATAL); - DBUG_RETURN(WSREP_OK); -} - -wsrep_status_t wsrep_apply_cb(void* const ctx, - const void* const buf, size_t const buf_len, - wsrep_seqno_t const global_seqno) -{ - THD* const thd((THD*)ctx); - - thd->wsrep_trx_seqno= global_seqno; - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "applying write set %lld: %p, %zu", - (long long)thd->wsrep_trx_seqno, buf, buf_len); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "applying write set"); -#endif /* WSREP_PROC_INFO */ - - wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len)); - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "applied write set %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "applied write set"); -#endif /* WSREP_PROC_INFO */ - - if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len); - TABLE *tmp; - while ((tmp = thd->temporary_tables)) - { - WSREP_DEBUG("Applier %lu, has temporary tables: %s.%s", - thd->thread_id, - (tmp->s) ? tmp->s->db.str : "void", - (tmp->s) ? tmp->s->table_name.str : "void"); - close_temporary_table(thd, tmp, 1, 1); - } - - return rcode; -} - -#if DELETE // this does not work in 5.5 -/* a common wrapper for end_trans() function - to put all necessary stuff */ -static inline wsrep_status_t -wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end) -{ - if (0 == end_trans(thd, end)) - { - return WSREP_OK; - } - else - { - return WSREP_FATAL; - } -} -#endif - -wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno) -{ -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "committing %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "committing"); -#endif /* WSREP_PROC_INFO */ - - wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0)); -// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT)); - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "committed %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "committed"); -#endif /* WSREP_PROC_INFO */ - - if (WSREP_OK == rcode) - { - // TODO: mark snapshot with global_seqno. - } - - return rcode; -} - -wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno) -{ -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "rolling back %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "rolling back"); -#endif /* WSREP_PROC_INFO */ - - wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0)); -// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK)); - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "rolled back %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "rolled back"); -#endif /* WSREP_PROC_INFO */ - - return rcode; -} - -wsrep_status_t wsrep_commit_cb(void* const ctx, - wsrep_seqno_t const global_seqno, - bool const commit) -{ - THD* const thd((THD*)ctx); - - assert(global_seqno == thd->wsrep_trx_seqno); - - if (commit) - return wsrep_commit(thd, global_seqno); - else - return wsrep_rollback(thd, global_seqno); -} - -Relay_log_info* wsrep_relay_log_init(const char* log_fname) -{ - Relay_log_info* rli= new Relay_log_info(false); - LEX_STRING conn = {"wsrep",5}; - - /* - * problem is that mariaDB requires master info for rli, and wsrep replication - * really should not have it. Allocating empty mi here just for the sake of - * getting rpl_filter pointer initialized for mi, rpl_filter will be needed in - * several places - */ - rli->mi= new Master_info(&conn, false); - - rli->no_storage= true; - if (!rli->relay_log.description_event_for_exec) - { - rli->relay_log.description_event_for_exec= - new Format_description_log_event(4); - } - - rli->sql_thd= current_thd; - return rli; -} - -static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) -{ - shadow->options = thd->variables.option_bits; - shadow->wsrep_exec_mode = thd->wsrep_exec_mode; - shadow->vio = thd->net.vio; - - if (opt_log_slave_updates) - thd->variables.option_bits|= OPTION_BIN_LOG; - else - thd->variables.option_bits&= ~(OPTION_BIN_LOG); - - if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay"); - - thd->wsrep_exec_mode= REPL_RECV; - thd->net.vio= 0; - thd->clear_error(); - - thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT; - - shadow->tx_isolation = thd->variables.tx_isolation; - thd->variables.tx_isolation = ISO_READ_COMMITTED; - thd->tx_isolation = ISO_READ_COMMITTED; -} - -static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) -{ - thd->variables.option_bits = shadow->options; - thd->wsrep_exec_mode = shadow->wsrep_exec_mode; - thd->net.vio = shadow->vio; - thd->variables.tx_isolation = shadow->tx_isolation; -} - -void wsrep_replication_process(THD *thd) -{ - int rcode; - DBUG_ENTER("wsrep_replication_process"); - - struct wsrep_thd_shadow shadow; - wsrep_prepare_bf_thd(thd, &shadow); - - rcode = wsrep->recv(wsrep, (void *)thd); - DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode)); - - WSREP_INFO("applier thread exiting (code:%d)", rcode); - - switch (rcode) { - case WSREP_OK: - case WSREP_NOT_IMPLEMENTED: - case WSREP_CONN_FAIL: - /* provider does not support slave operations / disconnected from group, - * just close applier thread */ - break; - case WSREP_NODE_FAIL: - /* data inconsistency => SST is needed */ - /* Note: we cannot just blindly restart replication here, - * SST might require server restart if storage engines must be - * initialized after SST */ - WSREP_ERROR("node consistency compromised, aborting"); - wsrep_kill_mysql(thd); - break; - case WSREP_WARNING: - case WSREP_TRX_FAIL: - case WSREP_TRX_MISSING: - /* these suggests a bug in provider code */ - WSREP_WARN("bad return from recv() call: %d", rcode); - /* fall through to node shutdown */ - case WSREP_FATAL: - /* Cluster connectivity is lost. - * - * If applier was killed on purpose (KILL_CONNECTION), we - * avoid mysql shutdown. This is because the killer will then handle - * shutdown processing (or replication restarting) - */ - if (thd->killed != KILL_CONNECTION) - { - wsrep_kill_mysql(thd); - } - break; - } - - mysql_mutex_lock(&LOCK_thread_count); - wsrep_close_applier(thd); - mysql_cond_broadcast(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); - - if (thd->temporary_tables) - { - WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id); - } - wsrep_return_from_bf_mode(thd, &shadow); - DBUG_VOID_RETURN; -} - -void wsrep_rollback_process(THD *thd) -{ - DBUG_ENTER("wsrep_rollback_process"); - - mysql_mutex_lock(&LOCK_wsrep_rollback); - wsrep_aborting_thd= NULL; - - while (thd->killed == NOT_KILLED) { - thd_proc_info(thd, "wsrep aborter idle"); - thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; - thd->mysys_var->current_cond= &COND_wsrep_rollback; - - mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback); - - WSREP_DEBUG("WSREP rollback thread wakes for signal"); - - mysql_mutex_lock(&thd->mysys_var->mutex); - thd_proc_info(thd, "wsrep aborter active"); - thd->mysys_var->current_mutex= 0; - thd->mysys_var->current_cond= 0; - mysql_mutex_unlock(&thd->mysys_var->mutex); - - /* check for false alarms */ - if (!wsrep_aborting_thd) - { - WSREP_DEBUG("WSREP rollback thread has empty abort queue"); - } - /* process all entries in the queue */ - while (wsrep_aborting_thd) { - THD *aborting; - wsrep_aborting_thd_t next = wsrep_aborting_thd->next; - aborting = wsrep_aborting_thd->aborting_thd; - my_free(wsrep_aborting_thd); - wsrep_aborting_thd= next; - /* - * must release mutex, appliers my want to add more - * aborting thds in our work queue, while we rollback - */ - mysql_mutex_unlock(&LOCK_wsrep_rollback); - - mysql_mutex_lock(&aborting->LOCK_wsrep_thd); - if (aborting->wsrep_conflict_state== ABORTED) - { - WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", - (long long)aborting->real_id, - aborting->wsrep_conflict_state); - - mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); - mysql_mutex_lock(&LOCK_wsrep_rollback); - continue; - } - aborting->wsrep_conflict_state= ABORTING; - - mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); - - aborting->store_globals(); - - mysql_mutex_lock(&aborting->LOCK_wsrep_thd); - wsrep_client_rollback(aborting); - WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)", - aborting->thread_id, (long long)aborting->real_id); - mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); - - mysql_mutex_lock(&LOCK_wsrep_rollback); - } - } - - mysql_mutex_unlock(&LOCK_wsrep_rollback); - sql_print_information("WSREP: rollbacker thread exiting"); - - DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting")); - DBUG_VOID_RETURN; -} - -extern -int wsrep_thd_is_brute_force(void *thd_ptr) -{ - if (thd_ptr) { - switch (((THD *)thd_ptr)->wsrep_exec_mode) { - case LOCAL_STATE: - { - if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING) - { - return 1; - } - return 0; - } - case REPL_RECV: return 1; - case TOTAL_ORDER: return 2; - case LOCAL_COMMIT: return 3; - } - } - return 0; -} -extern "C" -int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) -{ - THD *victim_thd = (THD *) victim_thd_ptr; - THD *bf_thd = (THD *) bf_thd_ptr; - DBUG_ENTER("wsrep_abort_thd"); - - if ( (WSREP(bf_thd) || - ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) && - bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && - victim_thd) - { - WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? - (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); - ha_wsrep_abort_transaction(bf_thd, victim_thd, signal); - } - else - { - WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd); - } - - DBUG_RETURN(1); -} -extern "C" -int wsrep_thd_in_locking_session(void *thd_ptr) -{ - if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) { - return 1; - } - return 0; -} -#endif - /** Retuns information about user or current user. diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 00be8bfbc21..e1b0dc50c65 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4063,7 +4063,8 @@ static Sys_var_tz Sys_time_zone( DEFAULT(&default_tz), NO_MUTEX_GUARD, IN_BINLOG); #ifdef WITH_WSREP -#include "wsrep_mysqld.h" +#include "wsrep_var.h" +#include "wsrep_sst.h" static Sys_var_charptr Sys_wsrep_provider( "wsrep_provider", "Path to replication provider library", @@ -4244,8 +4245,7 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK( static Sys_var_mybool Sys_wsrep_causal_reads( "wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations", SESSION_VAR(wsrep_causal_reads), - CMD_LINE(OPT_ARG), DEFAULT(FALSE)); - // ON_UPDATE(wsrep_causal_reads_update)); + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS }; static Sys_var_enum Sys_wsrep_OSU_method( diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 8a04c5cfd79..af8b025cfd7 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -17,6 +17,10 @@ #include <sql_class.h> #include <sql_parse.h> #include "wsrep_priv.h" +#include "wsrep_thd.h" +#include "wsrep_sst.h" +#include "wsrep_utils.h" +#include "wsrep_var.h" #include <cstdio> #include <cstdlib> #include "log_event.h" @@ -88,7 +92,6 @@ const char* wsrep_provider_version = provider_version; const char* wsrep_provider_vendor = provider_vendor; /* End wsrep status variables */ - wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; wsp::node_status local_status; @@ -99,14 +102,7 @@ long wsrep_protocol_version = 2; // if there was no state gap on receiving first view event. static my_bool wsrep_startup = TRUE; -// action execute callback -extern wsrep_status_t wsrep_apply_cb(void *ctx, - const void* buf, size_t buf_len, - wsrep_seqno_t global_seqno); - -extern wsrep_status_t wsrep_commit_cb (void *ctx, - wsrep_seqno_t global_seqno, - bool commit); +/* wsrep callbacks */ static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { switch (level) { diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 0df368c562d..67bc9d0ea15 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -79,11 +79,6 @@ extern ulong wsrep_retry_autocommit; extern my_bool wsrep_auto_increment_control; extern my_bool wsrep_drupal_282555_workaround; extern my_bool wsrep_incremental_data_collection; -extern const char* wsrep_sst_method; -extern const char* wsrep_sst_receive_address; -extern char* wsrep_sst_auth; -extern const char* wsrep_sst_donor; -extern my_bool wsrep_sst_donor_rejects_queries; extern const char* wsrep_start_position; extern long long wsrep_max_ws_size; extern long wsrep_max_ws_rows; @@ -117,71 +112,21 @@ extern const char* wsrep_provider_vendor; extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff); extern void wsrep_free_status(THD *thd); -#define WSREP_SST_ADDRESS_AUTO "AUTO" -#define WSREP_NODE_INCOMING_AUTO "AUTO" -// MySQL variables funcs +extern int wsrep_init_vars(); +extern void wsrep_provider_init (const char* provider); +extern void wsrep_start_position_init (const char* position); +extern void wsrep_sst_auth_init (const char* auth); -#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var) -#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type) -#define DEFAULT_ARGS (THD* thd, enum_var_type var_type) -#define INIT_ARGS (const char* opt) - -extern int wsrep_init_vars(); - -extern bool wsrep_on_update UPDATE_ARGS; -extern void wsrep_causal_reads_update UPDATE_ARGS; -extern bool wsrep_start_position_check CHECK_ARGS; -extern bool wsrep_start_position_update UPDATE_ARGS; -extern void wsrep_start_position_init INIT_ARGS; - -extern bool wsrep_provider_check CHECK_ARGS; -extern bool wsrep_provider_update UPDATE_ARGS; -extern void wsrep_provider_init INIT_ARGS; - -extern bool wsrep_provider_options_check CHECK_ARGS; -extern bool wsrep_provider_options_update UPDATE_ARGS; -extern void wsrep_provider_options_init INIT_ARGS; - -extern bool wsrep_cluster_address_check CHECK_ARGS; -extern bool wsrep_cluster_address_update UPDATE_ARGS; -extern void wsrep_cluster_address_init INIT_ARGS; - -extern bool wsrep_cluster_name_check CHECK_ARGS; -extern bool wsrep_cluster_name_update UPDATE_ARGS; - -extern bool wsrep_node_name_check CHECK_ARGS; -extern bool wsrep_node_name_update UPDATE_ARGS; - -extern bool wsrep_node_address_check CHECK_ARGS; -extern bool wsrep_node_address_update UPDATE_ARGS; -extern void wsrep_node_address_init INIT_ARGS; - -extern bool wsrep_sst_method_check CHECK_ARGS; -extern bool wsrep_sst_method_update UPDATE_ARGS; -extern void wsrep_sst_method_init INIT_ARGS; - -extern bool wsrep_sst_receive_address_check CHECK_ARGS; -extern bool wsrep_sst_receive_address_update UPDATE_ARGS; - -extern bool wsrep_sst_auth_check CHECK_ARGS; -extern bool wsrep_sst_auth_update UPDATE_ARGS; -extern void wsrep_sst_auth_init INIT_ARGS; - -extern bool wsrep_sst_donor_check CHECK_ARGS; -extern bool wsrep_sst_donor_update UPDATE_ARGS; - -extern bool wsrep_slave_threads_check CHECK_ARGS; -extern bool wsrep_slave_threads_update UPDATE_ARGS; - -extern bool wsrep_desync_check CHECK_ARGS; -extern bool wsrep_desync_update UPDATE_ARGS; - -extern bool wsrep_before_SE(); // initialize wsrep before storage - // engines (true) or after (false) extern int wsrep_init(); extern void wsrep_deinit(); extern void wsrep_recover(); +extern bool wsrep_before_SE(); // initialize wsrep before storage + // engines (true) or after (false) +/* wsrep initialization sequence at startup + * @param before wsrep_before_SE() value */ +extern void wsrep_init_startup(bool before); + @@ -215,17 +160,11 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal); -/* wsrep initialization sequence at startup - * @param first wsrep_before_SE() value */ -extern void wsrep_init_startup(bool before); - extern void wsrep_close_client_connections(my_bool wait_to_end); extern int wsrep_wait_committing_connections_close(int wait_time); extern void wsrep_close_applier(THD *thd); extern void wsrep_wait_appliers_close(THD *thd); extern void wsrep_close_applier_threads(int count); -extern void wsrep_create_appliers(long threads = wsrep_slave_threads); -extern void wsrep_create_rollbacker(); extern void wsrep_kill_mysql(THD *thd); /* new defines */ @@ -286,18 +225,6 @@ extern wsrep_seqno_t wsrep_locked_seqno; if (victim_thd) WSREP_LOG_CONFLICT_THD(victim_thd, "Victim thread"); \ } -/*! Synchronizes applier thread start with init thread */ -extern void wsrep_sst_grab(); -/*! Init thread waits for SST completion */ -extern bool wsrep_sst_wait(); -/*! Signals wsrep that initialization is complete, writesets can be applied */ -extern void wsrep_sst_continue(); - -extern void wsrep_SE_init_grab(); /*! grab init critical section */ -extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */ -extern void wsrep_SE_init_done(); /*! signal that SE init is complte */ -extern void wsrep_SE_initialized(); /*! mark SE initialization complete */ - extern void wsrep_ready_wait(); enum wsrep_trx_status { @@ -311,17 +238,10 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); class Ha_trx_info; struct THD_TRANS; void wsrep_register_hton(THD* thd, bool all); - -void wsrep_replication_process(THD *thd); -void wsrep_rollback_process(THD *thd); void wsrep_brute_force_killer(THD *thd); int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id); + extern "C" bool wsrep_consistency_check(void *thd_ptr); -//extern "C" int wsrep_thd_is_brute_force(void *thd_ptr); -extern int wsrep_thd_is_brute_force(void *thd_ptr); -extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, - my_bool signal); -extern "C" int wsrep_thd_in_locking_session(void *thd_ptr); /* this is visible for client build so that innodb plugin gets this */ typedef struct wsrep_aborting_thd { diff --git a/sql/wsrep_notify.cc b/sql/wsrep_notify.cc index ff997d01183..291cdbb7c75 100644 --- a/sql/wsrep_notify.cc +++ b/sql/wsrep_notify.cc @@ -15,6 +15,7 @@ #include <mysqld.h> #include "wsrep_priv.h" +#include "wsrep_utils.h" const char* wsrep_notify_cmd=""; @@ -64,7 +65,7 @@ void wsrep_notify_status (wsrep_member_status_t status, { char uuid_str[40]; - wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str)); + wsrep_uuid_print (&view->state_id.uuid, uuid_str, sizeof(uuid_str)); cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --uuid %s", uuid_str); diff --git a/sql/wsrep_notify.cc.moved b/sql/wsrep_notify.cc.moved new file mode 100644 index 00000000000..ff997d01183 --- /dev/null +++ b/sql/wsrep_notify.cc.moved @@ -0,0 +1,107 @@ +/* Copyright 2010 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <mysqld.h> +#include "wsrep_priv.h" + +const char* wsrep_notify_cmd=""; + +static const char* _status_str(wsrep_member_status_t status) +{ + switch (status) + { + case WSREP_MEMBER_UNDEFINED: return "Undefined"; + case WSREP_MEMBER_JOINER: return "Joiner"; + case WSREP_MEMBER_DONOR: return "Donor"; + case WSREP_MEMBER_JOINED: return "Joined"; + case WSREP_MEMBER_SYNCED: return "Synced"; + default: return "Error(?)"; + } +} + +void wsrep_notify_status (wsrep_member_status_t status, + const wsrep_view_info_t* view) +{ + if (!wsrep_notify_cmd || 0 == strlen(wsrep_notify_cmd)) + { + WSREP_INFO("wsrep_notify_cmd is not defined, skipping notification."); + return; + } + + char cmd_buf[1 << 16]; // this can be long + long cmd_len = sizeof(cmd_buf) - 1; + char* cmd_ptr = cmd_buf; + long cmd_off = 0; + + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, "%s", + wsrep_notify_cmd); + + if (status >= WSREP_MEMBER_UNDEFINED && status < WSREP_MEMBER_ERROR) + { + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --status %s", + _status_str(status)); + } + else + { + /* here we preserve provider error codes */ + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + " --status 'Error(%d)'", status); + } + + if (0 != view) + { + char uuid_str[40]; + + wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str)); + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + " --uuid %s", uuid_str); + + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + " --primary %s", view->view >= 0 ? "yes" : "no"); + + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + " --index %d", view->my_idx); + + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --members"); + + for (int i = 0; i < view->memb_num; i++) + { + wsrep_uuid_print (&view->members[i].id, uuid_str, sizeof(uuid_str)); + cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, + "%c%s/%s/%s", i > 0 ? ',' : ' ', + uuid_str, view->members[i].name, + view->members[i].incoming); + } + } + + if (cmd_off == cmd_len) + { + WSREP_ERROR("Notification buffer too short (%ld). Aborting notification.", + cmd_len); + return; + } + + wsp::process p(cmd_ptr, "r"); + + p.wait(); + int err = p.error(); + + if (err) + { + WSREP_ERROR("Notification command failed: %d (%s): \"%s\"", + err, strerror(err), cmd_ptr); + } +} + diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h index 700639ebcb1..291823d773e 100644 --- a/sql/wsrep_priv.h +++ b/sql/wsrep_priv.h @@ -26,208 +26,30 @@ #include <pthread.h> #include <cstdio> -extern void wsrep_ready_set (my_bool x); +void wsrep_ready_set (my_bool x); -extern ssize_t wsrep_sst_prepare (void** msg); -extern int wsrep_sst_donate_cb (void* app_ctx, - void* recv_ctx, - const void* msg, size_t msg_len, - const wsrep_uuid_t* current_uuid, - wsrep_seqno_t current_seqno, - const char* state, size_t state_len, - bool bypass); - -extern size_t guess_ip (char* buf, size_t buf_len); -extern size_t guess_address(char* buf, size_t buf_len); +ssize_t wsrep_sst_prepare (void** msg); +wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, + void* recv_ctx, + const void* msg, size_t msg_len, + const wsrep_gtid_t* state_id, + const char* state, size_t state_len, + bool bypass); extern wsrep_uuid_t local_uuid; extern wsrep_seqno_t local_seqno; +// a helper function +void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t, + const void*, size_t); /*! SST thread signals init thread about sst completion */ -extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool); +void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); extern void wsrep_notify_status (wsrep_member_status_t new_status, const wsrep_view_info_t* view = 0); -namespace wsp { -class node_status -{ -public: - node_status() : status(WSREP_MEMBER_UNDEFINED) {} - void set(wsrep_member_status_t new_status, - const wsrep_view_info_t* view = 0) - { - if (status != new_status || 0 != view) - { - wsrep_notify_status(new_status, view); - status = new_status; - } - } - wsrep_member_status_t get() const { return status; } -private: - wsrep_member_status_t status; -}; -} /* namespace wsp */ - -extern wsp::node_status local_status; - -namespace wsp { -/* A small class to run external programs. */ -class process -{ -private: - const char* const str_; - FILE* io_; - int err_; - pid_t pid_; - -public: -/*! @arg type is a pointer to a null-terminated string which must contain - either the letter 'r' for reading or the letter 'w' for writing. - */ - process (const char* cmd, const char* type); - ~process (); - - FILE* pipe () { return io_; } - int error() { return err_; } - int wait (); - const char* cmd() { return str_; } -}; -#ifdef REMOVED -class lock -{ - pthread_mutex_t* const mtx_; - -public: - - lock (pthread_mutex_t* mtx) : mtx_(mtx) - { - int err = pthread_mutex_lock (mtx_); - - if (err) - { - WSREP_ERROR("Mutex lock failed: %s", strerror(err)); - abort(); - } - } - - virtual ~lock () - { - int err = pthread_mutex_unlock (mtx_); - - if (err) - { - WSREP_ERROR("Mutex unlock failed: %s", strerror(err)); - abort(); - } - } - - inline void wait (pthread_cond_t* cond) - { - pthread_cond_wait (cond, mtx_); - } - -private: - - lock (const lock&); - lock& operator=(const lock&); - -}; - -class monitor -{ - int mutable refcnt; - pthread_mutex_t mutable mtx; - pthread_cond_t mutable cond; - -public: - - monitor() : refcnt(0) - { - pthread_mutex_init (&mtx, NULL); - pthread_cond_init (&cond, NULL); - } - - ~monitor() - { - pthread_mutex_destroy (&mtx); - pthread_cond_destroy (&cond); - } - - void enter() const - { - lock l(&mtx); - - while (refcnt) - { - l.wait(&cond); - } - refcnt++; - } - - void leave() const - { - lock l(&mtx); - - refcnt--; - if (refcnt == 0) - { - pthread_cond_signal (&cond); - } - } - -private: - - monitor (const monitor&); - monitor& operator= (const monitor&); -}; - -class critical -{ - const monitor& mon; - -public: - - critical(const monitor& m) : mon(m) { mon.enter(); } - - ~critical() { mon.leave(); } - -private: - - critical (const critical&); - critical& operator= (const critical&); -}; -#endif - -class thd -{ - class thd_init - { - public: - thd_init() { my_thread_init(); } - ~thd_init() { my_thread_end(); } - } - init; - - thd (const thd&); - thd& operator= (const thd&); - -public: - - thd(my_bool wsrep_on); - ~thd(); - THD* const ptr; -}; +/* binlog-related stuff */ +int wsrep_write_cache(IO_CACHE *cache, uchar **buf, size_t *buf_len); -class string -{ -public: - string() : string_(0) {} - void set(char* str) { if (string_) free (string_); string_ = str; } - ~string() { set (0); } -private: - char* string_; -}; -} // namespace wsrep #endif /* WSREP_PRIV_H */ diff --git a/sql/wsrep_priv.h.moved b/sql/wsrep_priv.h.moved new file mode 100644 index 00000000000..700639ebcb1 --- /dev/null +++ b/sql/wsrep_priv.h.moved @@ -0,0 +1,233 @@ +/* Copyright 2010 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +//! @file declares symbols private to wsrep integration layer + +#ifndef WSREP_PRIV_H +#define WSREP_PRIV_H + +#include "wsrep_mysqld.h" +#include "../wsrep/wsrep_api.h" + +#include <log.h> +#include <pthread.h> +#include <cstdio> + +extern void wsrep_ready_set (my_bool x); + +extern ssize_t wsrep_sst_prepare (void** msg); +extern int wsrep_sst_donate_cb (void* app_ctx, + void* recv_ctx, + const void* msg, size_t msg_len, + const wsrep_uuid_t* current_uuid, + wsrep_seqno_t current_seqno, + const char* state, size_t state_len, + bool bypass); + +extern size_t guess_ip (char* buf, size_t buf_len); +extern size_t guess_address(char* buf, size_t buf_len); + +extern wsrep_uuid_t local_uuid; +extern wsrep_seqno_t local_seqno; + +/*! SST thread signals init thread about sst completion */ +extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool); + +extern void wsrep_notify_status (wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0); + +namespace wsp { +class node_status +{ +public: + node_status() : status(WSREP_MEMBER_UNDEFINED) {} + void set(wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0) + { + if (status != new_status || 0 != view) + { + wsrep_notify_status(new_status, view); + status = new_status; + } + } + wsrep_member_status_t get() const { return status; } +private: + wsrep_member_status_t status; +}; +} /* namespace wsp */ + +extern wsp::node_status local_status; + +namespace wsp { +/* A small class to run external programs. */ +class process +{ +private: + const char* const str_; + FILE* io_; + int err_; + pid_t pid_; + +public: +/*! @arg type is a pointer to a null-terminated string which must contain + either the letter 'r' for reading or the letter 'w' for writing. + */ + process (const char* cmd, const char* type); + ~process (); + + FILE* pipe () { return io_; } + int error() { return err_; } + int wait (); + const char* cmd() { return str_; } +}; +#ifdef REMOVED +class lock +{ + pthread_mutex_t* const mtx_; + +public: + + lock (pthread_mutex_t* mtx) : mtx_(mtx) + { + int err = pthread_mutex_lock (mtx_); + + if (err) + { + WSREP_ERROR("Mutex lock failed: %s", strerror(err)); + abort(); + } + } + + virtual ~lock () + { + int err = pthread_mutex_unlock (mtx_); + + if (err) + { + WSREP_ERROR("Mutex unlock failed: %s", strerror(err)); + abort(); + } + } + + inline void wait (pthread_cond_t* cond) + { + pthread_cond_wait (cond, mtx_); + } + +private: + + lock (const lock&); + lock& operator=(const lock&); + +}; + +class monitor +{ + int mutable refcnt; + pthread_mutex_t mutable mtx; + pthread_cond_t mutable cond; + +public: + + monitor() : refcnt(0) + { + pthread_mutex_init (&mtx, NULL); + pthread_cond_init (&cond, NULL); + } + + ~monitor() + { + pthread_mutex_destroy (&mtx); + pthread_cond_destroy (&cond); + } + + void enter() const + { + lock l(&mtx); + + while (refcnt) + { + l.wait(&cond); + } + refcnt++; + } + + void leave() const + { + lock l(&mtx); + + refcnt--; + if (refcnt == 0) + { + pthread_cond_signal (&cond); + } + } + +private: + + monitor (const monitor&); + monitor& operator= (const monitor&); +}; + +class critical +{ + const monitor& mon; + +public: + + critical(const monitor& m) : mon(m) { mon.enter(); } + + ~critical() { mon.leave(); } + +private: + + critical (const critical&); + critical& operator= (const critical&); +}; +#endif + +class thd +{ + class thd_init + { + public: + thd_init() { my_thread_init(); } + ~thd_init() { my_thread_end(); } + } + init; + + thd (const thd&); + thd& operator= (const thd&); + +public: + + thd(my_bool wsrep_on); + ~thd(); + THD* const ptr; +}; + +class string +{ +public: + string() : string_(0) {} + void set(char* str) { if (string_) free (string_); string_ = str; } + ~string() { set (0); } +private: + char* string_; +}; + +} // namespace wsrep +#endif /* WSREP_PRIV_H */ diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index 4afe269cfe2..d651e1ed0a9 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -13,6 +13,8 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include "wsrep_sst.h" + #include <mysqld.h> #include <sql_class.h> #include <set_var.h> @@ -20,6 +22,7 @@ #include <sql_reload.h> #include <sql_parse.h> #include "wsrep_priv.h" +#include "wsrep_utils.h" #include <cstdio> #include <cstdlib> @@ -58,7 +61,6 @@ const char* wsrep_sst_donor = ""; // container for real auth string static const char* sst_auth_real = NULL; - my_bool wsrep_sst_donor_rejects_queries = FALSE; bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) diff --git a/sql/wsrep_sst.h b/sql/wsrep_sst.h new file mode 100644 index 00000000000..b7f0e26f226 --- /dev/null +++ b/sql/wsrep_sst.h @@ -0,0 +1,40 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_SST_H +#define WSREP_SST_H + +#include <mysql.h> // my_bool + +/* system variables */ +extern const char* wsrep_sst_method; +extern const char* wsrep_sst_receive_address; +extern const char* wsrep_sst_donor; +extern char* wsrep_sst_auth; +extern my_bool wsrep_sst_donor_rejects_queries; + +/*! Synchronizes applier thread start with init thread */ +extern void wsrep_sst_grab(); +/*! Init thread waits for SST completion */ +extern bool wsrep_sst_wait(); +/*! Signals wsrep that initialization is complete, writesets can be applied */ +extern void wsrep_sst_continue(); + +extern void wsrep_SE_init_grab(); /*! grab init critical section */ +extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */ +extern void wsrep_SE_init_done(); /*! signal that SE init is complte */ +extern void wsrep_SE_initialized(); /*! mark SE initialization complete */ + +#endif /* WSREP_SST_H */ diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc new file mode 100644 index 00000000000..9cbd32cac73 --- /dev/null +++ b/sql/wsrep_thd.cc @@ -0,0 +1,424 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#include "wsrep_thd.h" + +#include "transaction.h" +#include "rpl_rli.h" +#include "log_event.h" +#include "sql_parse.h" +#include "slave.h" // opt_log_slave_updates +#include "sql_base.h" // close_thread_tables() +#include "mysqld.h" // start_wsrep_THD(); + +/* must have (&thd->LOCK_wsrep_thd) */ +void wsrep_client_rollback(THD *thd) +{ + WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s", + thd->thread_id, thd->query()); + + thd->wsrep_conflict_state= ABORTING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + trans_rollback(thd); + + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + + if (thd->global_read_lock.is_acquired()) + { + WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id); + thd->global_read_lock.unlock_global_read_lock(thd); + } + + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + + /* release explicit MDL locks */ + thd->mdl_context.release_explicit_locks(); + + if (thd->get_binlog_table_maps()) + { + WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id); + thd->clear_binlog_table_maps(); + } + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state= ABORTED; +} + +static Relay_log_info* wsrep_relay_log_init(const char* log_fname) +{ + Relay_log_info* rli= new Relay_log_info(false); + + rli->no_storage= true; + if (!rli->relay_log.description_event_for_exec) + { + rli->relay_log.description_event_for_exec= + new Format_description_log_event(4); + } + + rli->sql_thd= current_thd; + return rli; +} + +static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) +{ + shadow->options = thd->variables.option_bits; + shadow->wsrep_exec_mode = thd->wsrep_exec_mode; + shadow->vio = thd->net.vio; + + if (opt_log_slave_updates) + thd->variables.option_bits|= OPTION_BIN_LOG; + else + thd->variables.option_bits&= ~(OPTION_BIN_LOG); + + if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay"); + + thd->wsrep_exec_mode= REPL_RECV; + thd->net.vio= 0; + thd->clear_error(); + + thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT; + + shadow->tx_isolation = thd->variables.tx_isolation; + thd->variables.tx_isolation = ISO_READ_COMMITTED; + thd->tx_isolation = ISO_READ_COMMITTED; +} + +static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) +{ + thd->variables.option_bits = shadow->options; + thd->wsrep_exec_mode = shadow->wsrep_exec_mode; + thd->net.vio = shadow->vio; + thd->variables.tx_isolation = shadow->tx_isolation; +} + +void wsrep_replay_transaction(THD *thd) +{ + /* checking if BF trx must be replayed */ + if (thd->wsrep_conflict_state== MUST_REPLAY) { + if (thd->wsrep_exec_mode!= REPL_RECV) { + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay issue, thd has reported status already"); + } + thd->stmt_da->reset_diagnostics_area(); + + thd->wsrep_conflict_state= REPLAYING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + mysql_reset_thd_for_next_command(thd); + thd->killed= THD::NOT_KILLED; + close_thread_tables(thd); + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("releasing table lock for replaying (%ld)", + thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + thd->mdl_context.release_transactional_locks(); + + thd_proc_info(thd, "wsrep replaying trx"); + WSREP_DEBUG("replay trx: %s %lld", + thd->query() ? thd->query() : "void", + (long long)wsrep_thd_trx_seqno(thd)); + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + int rcode = wsrep->replay_trx(wsrep, + &thd->wsrep_ws_handle, + (void *)thd); + + wsrep_return_from_bf_mode(thd, &shadow); + if (thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + + switch (rcode) + { + case WSREP_OK: + thd->wsrep_conflict_state= NO_CONFLICT; + wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); + WSREP_DEBUG("trx_replay successful for: %ld %llu", + thd->thread_id, (long long)thd->real_id); + if (thd->stmt_da->is_sent) + { + WSREP_WARN("replay ok, thd has reported status"); + } + else + { + my_ok(thd); + } + break; + case WSREP_TRX_FAIL: + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay failed, thd has reported status"); + } + else + { + WSREP_DEBUG("replay failed, rolling back"); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + } + thd->wsrep_conflict_state= ABORTED; + wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle); + break; + default: + WSREP_ERROR("trx_replay failed for: %d, query: %s", + rcode, thd->query() ? thd->query() : "void"); + /* we're now in inconsistent state, must abort */ + unireg_abort(1); + break; + } + mysql_mutex_lock(&LOCK_wsrep_replaying); + wsrep_replaying--; + WSREP_DEBUG("replaying decreased: %d, thd: %lu", + wsrep_replaying, thd->thread_id); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + } + } +} + +static void wsrep_replication_process(THD *thd) +{ + int rcode; + DBUG_ENTER("wsrep_replication_process"); + + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + + rcode = wsrep->recv(wsrep, (void *)thd); + DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode)); + + WSREP_INFO("applier thread exiting (code:%d)", rcode); + + switch (rcode) { + case WSREP_OK: + case WSREP_NOT_IMPLEMENTED: + case WSREP_CONN_FAIL: + /* provider does not support slave operations / disconnected from group, + * just close applier thread */ + break; + case WSREP_NODE_FAIL: + /* data inconsistency => SST is needed */ + /* Note: we cannot just blindly restart replication here, + * SST might require server restart if storage engines must be + * initialized after SST */ + WSREP_ERROR("node consistency compromised, aborting"); + wsrep_kill_mysql(thd); + break; + case WSREP_WARNING: + case WSREP_TRX_FAIL: + case WSREP_TRX_MISSING: + /* these suggests a bug in provider code */ + WSREP_WARN("bad return from recv() call: %d", rcode); + /* fall through to node shutdown */ + case WSREP_FATAL: + /* Cluster connectivity is lost. + * + * If applier was killed on purpose (KILL_CONNECTION), we + * avoid mysql shutdown. This is because the killer will then handle + * shutdown processing (or replication restarting) + */ + if (thd->killed != THD::KILL_CONNECTION) + { + wsrep_kill_mysql(thd); + } + break; + } + + mysql_mutex_lock(&LOCK_thread_count); + wsrep_close_applier(thd); + mysql_cond_broadcast(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + if (thd->temporary_tables) + { + WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id); + } + wsrep_return_from_bf_mode(thd, &shadow); + DBUG_VOID_RETURN; +} + +void wsrep_create_appliers(long threads) +{ + if (!wsrep_connected) + { + /* see wsrep_replication_start() for the logic */ + if (wsrep_cluster_address && strlen(wsrep_cluster_address) && + wsrep_provider && strcasecmp(wsrep_provider, "none")) + { + WSREP_ERROR("Trying to launch slave threads before creating " + "connection at '%s'", wsrep_cluster_address); + assert(0); + } + return; + } + + long wsrep_threads=0; + pthread_t hThread; + while (wsrep_threads++ < threads) { + if (pthread_create( + &hThread, &connection_attrib, + start_wsrep_THD, (void*)wsrep_replication_process)) + WSREP_WARN("Can't create thread to manage wsrep replication"); + } +} + +static void wsrep_rollback_process(THD *thd) +{ + DBUG_ENTER("wsrep_rollback_process"); + + mysql_mutex_lock(&LOCK_wsrep_rollback); + wsrep_aborting_thd= NULL; + + while (thd->killed == THD::NOT_KILLED) { + thd_proc_info(thd, "wsrep aborter idle"); + thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; + thd->mysys_var->current_cond= &COND_wsrep_rollback; + + mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback); + + WSREP_DEBUG("WSREP rollback thread wakes for signal"); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd_proc_info(thd, "wsrep aborter active"); + thd->mysys_var->current_mutex= 0; + thd->mysys_var->current_cond= 0; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + /* check for false alarms */ + if (!wsrep_aborting_thd) + { + WSREP_DEBUG("WSREP rollback thread has empty abort queue"); + } + /* process all entries in the queue */ + while (wsrep_aborting_thd) { + THD *aborting; + wsrep_aborting_thd_t next = wsrep_aborting_thd->next; + aborting = wsrep_aborting_thd->aborting_thd; + my_free(wsrep_aborting_thd); + wsrep_aborting_thd= next; + /* + * must release mutex, appliers my want to add more + * aborting thds in our work queue, while we rollback + */ + mysql_mutex_unlock(&LOCK_wsrep_rollback); + + mysql_mutex_lock(&aborting->LOCK_wsrep_thd); + if (aborting->wsrep_conflict_state== ABORTED) + { + WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", + (long long)aborting->real_id, + aborting->wsrep_conflict_state); + + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + mysql_mutex_lock(&LOCK_wsrep_rollback); + continue; + } + aborting->wsrep_conflict_state= ABORTING; + + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + + aborting->store_globals(); + + mysql_mutex_lock(&aborting->LOCK_wsrep_thd); + wsrep_client_rollback(aborting); + WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)", + aborting->thread_id, (long long)aborting->real_id); + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + + mysql_mutex_lock(&LOCK_wsrep_rollback); + } + } + + mysql_mutex_unlock(&LOCK_wsrep_rollback); + sql_print_information("WSREP: rollbacker thread exiting"); + + DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting")); + DBUG_VOID_RETURN; +} + +void wsrep_create_rollbacker() +{ + if (wsrep_provider && strcasecmp(wsrep_provider, "none")) + { + pthread_t hThread; + /* create rollbacker */ + if (pthread_create( &hThread, &connection_attrib, + start_wsrep_THD, (void*)wsrep_rollback_process)) + WSREP_WARN("Can't create thread to manage wsrep rollback"); + } +} + +extern "C" +int wsrep_thd_is_brute_force(void *thd_ptr) +{ + if (thd_ptr) { + switch (((THD *)thd_ptr)->wsrep_exec_mode) { + case LOCAL_STATE: + { + if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING) + { + return 1; + } + return 0; + } + case REPL_RECV: return 1; + case TOTAL_ORDER: return 2; + case LOCAL_COMMIT: return 3; + } + } + return 0; +} + +extern "C" +int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) +{ + THD *victim_thd = (THD *) victim_thd_ptr; + THD *bf_thd = (THD *) bf_thd_ptr; + DBUG_ENTER("wsrep_abort_thd"); + + if ( (WSREP(bf_thd) || + ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) && + bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && + victim_thd) + { + WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? + (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); + ha_wsrep_abort_transaction(bf_thd, victim_thd, signal); + } + else + { + WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd); + } + + DBUG_RETURN(1); +} + +extern "C" +int wsrep_thd_in_locking_session(void *thd_ptr) +{ + if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) { + return 1; + } + return 0; +} + diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h new file mode 100644 index 00000000000..2397230b0a2 --- /dev/null +++ b/sql/wsrep_thd.h @@ -0,0 +1,31 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_THD_H +#define WSREP_THD_H + +#include "sql_class.h" + +void wsrep_client_rollback(THD *thd); +void wsrep_replay_transaction(THD *thd); +void wsrep_create_appliers(long threads); +void wsrep_create_rollbacker(); + +extern "C" int wsrep_thd_is_brute_force(void *thd_ptr); +extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, + my_bool signal); +extern "C" int wsrep_thd_in_locking_session(void *thd_ptr); + +#endif /* WSREP_THD_H */ diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index 53d0f7c449e..37e537c62e4 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -20,6 +20,10 @@ #define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag #endif +#include "wsrep_utils.h" +#include "wsrep_mysqld.h" +//#include "wsrep_api.h" +//#include "wsrep_priv.h" #include <spawn.h> // posix_spawn() #include <unistd.h> // pipe() #include <errno.h> // errno diff --git a/sql/wsrep_utils.h b/sql/wsrep_utils.h new file mode 100644 index 00000000000..337678238f8 --- /dev/null +++ b/sql/wsrep_utils.h @@ -0,0 +1,208 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_UTILS_H +#define WSREP_UTILS_H + +#include "wsrep_priv.h" + +unsigned int wsrep_check_ip (const char* addr); +size_t wsrep_guess_ip (char* buf, size_t buf_len); +size_t wsrep_guess_address(char* buf, size_t buf_len); + +namespace wsp { +class node_status +{ +public: + node_status() : status(WSREP_MEMBER_UNDEFINED) {} + void set(wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0) + { + if (status != new_status || 0 != view) + { + wsrep_notify_status(new_status, view); + status = new_status; + } + } + wsrep_member_status_t get() const { return status; } +private: + wsrep_member_status_t status; +}; +} /* namespace wsp */ + +extern wsp::node_status local_status; + +namespace wsp { +/* A small class to run external programs. */ +class process +{ +private: + const char* const str_; + FILE* io_; + int err_; + pid_t pid_; + +public: +/*! @arg type is a pointer to a null-terminated string which must contain + either the letter 'r' for reading or the letter 'w' for writing. + */ + process (const char* cmd, const char* type); + ~process (); + + FILE* pipe () { return io_; } + int error() { return err_; } + int wait (); + const char* cmd() { return str_; } +}; + +class thd +{ + class thd_init + { + public: + thd_init() { my_thread_init(); } + ~thd_init() { my_thread_end(); } + } + init; + + thd (const thd&); + thd& operator= (const thd&); + +public: + + thd(my_bool wsrep_on); + ~thd(); + THD* const ptr; +}; + +class string +{ +public: + string() : string_(0) {} + void set(char* str) { if (string_) free (string_); string_ = str; } + ~string() { set (0); } +private: + char* string_; +}; + +#ifdef REMOVED +class lock +{ + pthread_mutex_t* const mtx_; + +public: + + lock (pthread_mutex_t* mtx) : mtx_(mtx) + { + int err = pthread_mutex_lock (mtx_); + + if (err) + { + WSREP_ERROR("Mutex lock failed: %s", strerror(err)); + abort(); + } + } + + virtual ~lock () + { + int err = pthread_mutex_unlock (mtx_); + + if (err) + { + WSREP_ERROR("Mutex unlock failed: %s", strerror(err)); + abort(); + } + } + + inline void wait (pthread_cond_t* cond) + { + pthread_cond_wait (cond, mtx_); + } + +private: + + lock (const lock&); + lock& operator=(const lock&); + +}; + +class monitor +{ + int mutable refcnt; + pthread_mutex_t mutable mtx; + pthread_cond_t mutable cond; + +public: + + monitor() : refcnt(0) + { + pthread_mutex_init (&mtx, NULL); + pthread_cond_init (&cond, NULL); + } + + ~monitor() + { + pthread_mutex_destroy (&mtx); + pthread_cond_destroy (&cond); + } + + void enter() const + { + lock l(&mtx); + + while (refcnt) + { + l.wait(&cond); + } + refcnt++; + } + + void leave() const + { + lock l(&mtx); + + refcnt--; + if (refcnt == 0) + { + pthread_cond_signal (&cond); + } + } + +private: + + monitor (const monitor&); + monitor& operator= (const monitor&); +}; + +class critical +{ + const monitor& mon; + +public: + + critical(const monitor& m) : mon(m) { mon.enter(); } + + ~critical() { mon.leave(); } + +private: + + critical (const critical&); + critical& operator= (const critical&); +}; +#endif + +} // namespace wsrep + +#endif /* WSREP_UTILS_H */ diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index 17e8de9ba6b..bd041ed51ff 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -13,12 +13,15 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include "wsrep_var.h" + #include <mysqld.h> #include <sql_class.h> #include <sql_plugin.h> #include <set_var.h> #include <sql_acl.h> #include "wsrep_priv.h" +#include "wsrep_thd.h" #include <my_dir.h> #include <cstdio> #include <cstdlib> @@ -157,7 +160,7 @@ void wsrep_start_position_init (const char* val) if (NULL == val || wsrep_start_position_verify (val)) { WSREP_ERROR("Bad initial value for wsrep_start_position: %s", - (val ? val : "")); + (val ? val : "")); return; } @@ -173,7 +176,7 @@ static bool refresh_provider_options() { if (wsrep_provider_options) my_free((void *)wsrep_provider_options); wsrep_provider_options = (char*)my_memdup(opts, strlen(opts) + 1, - MYF(MY_WME)); + MYF(MY_WME)); } else { diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h new file mode 100644 index 00000000000..1dd0beac3e3 --- /dev/null +++ b/sql/wsrep_var.h @@ -0,0 +1,81 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_VAR_H +#define WSREP_VAR_H + +#define WSREP_NODE_INCOMING_AUTO "AUTO" + +// MySQL variables funcs + +#include "sql_priv.h" +class sys_var; +class set_var; +class THD; + +#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var) +#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type) +#define DEFAULT_ARGS (THD* thd, enum_var_type var_type) +#define INIT_ARGS (const char* opt) + +extern bool wsrep_on_update UPDATE_ARGS; +extern void wsrep_causal_reads_update UPDATE_ARGS; +extern bool wsrep_start_position_check CHECK_ARGS; +extern bool wsrep_start_position_update UPDATE_ARGS; +extern void wsrep_start_position_init INIT_ARGS; + +extern bool wsrep_provider_check CHECK_ARGS; +extern bool wsrep_provider_update UPDATE_ARGS; +extern void wsrep_provider_init INIT_ARGS; + +extern bool wsrep_provider_options_check CHECK_ARGS; +extern bool wsrep_provider_options_update UPDATE_ARGS; +extern void wsrep_provider_options_init INIT_ARGS; + +extern bool wsrep_cluster_address_check CHECK_ARGS; +extern bool wsrep_cluster_address_update UPDATE_ARGS; +extern void wsrep_cluster_address_init INIT_ARGS; + +extern bool wsrep_cluster_name_check CHECK_ARGS; +extern bool wsrep_cluster_name_update UPDATE_ARGS; + +extern bool wsrep_node_name_check CHECK_ARGS; +extern bool wsrep_node_name_update UPDATE_ARGS; + +extern bool wsrep_node_address_check CHECK_ARGS; +extern bool wsrep_node_address_update UPDATE_ARGS; +extern void wsrep_node_address_init INIT_ARGS; + +extern bool wsrep_sst_method_check CHECK_ARGS; +extern bool wsrep_sst_method_update UPDATE_ARGS; +extern void wsrep_sst_method_init INIT_ARGS; + +extern bool wsrep_sst_receive_address_check CHECK_ARGS; +extern bool wsrep_sst_receive_address_update UPDATE_ARGS; + +extern bool wsrep_sst_auth_check CHECK_ARGS; +extern bool wsrep_sst_auth_update UPDATE_ARGS; +extern void wsrep_sst_auth_init INIT_ARGS; + +extern bool wsrep_sst_donor_check CHECK_ARGS; +extern bool wsrep_sst_donor_update UPDATE_ARGS; + +extern bool wsrep_slave_threads_check CHECK_ARGS; +extern bool wsrep_slave_threads_update UPDATE_ARGS; + +extern bool wsrep_desync_check CHECK_ARGS; +extern bool wsrep_desync_update UPDATE_ARGS; + +#endif /* WSREP_VAR_H */ |