diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 5 | ||||
-rw-r--r-- | sql/mysqld.cc | 23 | ||||
-rw-r--r-- | sql/sql_class.cc | 4 | ||||
-rw-r--r-- | sql/sql_class.h | 2 | ||||
-rw-r--r-- | sql/sql_parse.cc | 93 | ||||
-rw-r--r-- | sql/sql_partition_admin.cc | 4 | ||||
-rw-r--r-- | sql/sql_table.cc | 14 | ||||
-rw-r--r-- | sql/sys_vars.cc | 12 | ||||
-rw-r--r-- | sql/wsrep_applier.cc | 7 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 1 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 291 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 7 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 9 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 14 | ||||
-rw-r--r-- | sql/wsrep_utils.h | 3 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 49 | ||||
-rw-r--r-- | sql/wsrep_var.h | 4 |
17 files changed, 366 insertions, 176 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index dae47668410..49e9626cb81 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1660,6 +1660,11 @@ int ha_rollback_trans(THD *thd, bool all) { // cannot happen my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); error=1; +#ifdef WITH_WSREP + WSREP_WARN("handlerton rollback failed, thd %lu %lld conf %d SQL %s", + thd->thread_id, thd->query_id, thd->wsrep_conflict_state, + thd->query()); +#endif /* WITH_WSREP */ } status_var_increment(thd->status_var.ha_rollback_count); ha_info_next= ha_info->next(); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 3af8750de13..97d63d99458 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9226,8 +9226,29 @@ mysqld_get_one_option(int optid, const struct my_option *opt, char *argument) } #ifdef WITH_WSREP case OPT_WSREP_CAUSAL_READS: - wsrep_causal_reads_update(&global_system_variables); + { + if (global_system_variables.wsrep_causal_reads) + { + WSREP_WARN("option --wsrep-causal-reads is deprecated"); + if (!(global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ)) + { + WSREP_WARN("--wsrep-causal-reads=ON takes precedence over --wsrep-sync-wait=%u. " + "WSREP_SYNC_WAIT_BEFORE_READ is on", + global_system_variables.wsrep_sync_wait); + global_system_variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; + } + } + else + { + if (global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ) { + WSREP_WARN("--wsrep-sync-wait=%u takes precedence over --wsrep-causal-reads=OFF. " + "WSREP_SYNC_WAIT_BEFORE_READ is on", + global_system_variables.wsrep_sync_wait); + global_system_variables.wsrep_causal_reads = 1; + } + } break; + } case OPT_WSREP_SYNC_WAIT: global_system_variables.wsrep_causal_reads= MY_TEST(global_system_variables.wsrep_sync_wait & diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 4df257b96a4..ce1f69b76a9 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1023,6 +1023,8 @@ THD::THD(bool is_wsrep_applier) wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; wsrep_affected_rows = 0; + wsrep_replicate_GTID = false; + wsrep_skip_wsrep_GTID = false; #endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -1439,6 +1441,8 @@ void THD::init(void) wsrep_TOI_pre_query_len = 0; wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; wsrep_affected_rows = 0; + wsrep_replicate_GTID = false; + wsrep_skip_wsrep_GTID = false; #endif /* WITH_WSREP */ if (variables.sql_log_bin) diff --git a/sql/sql_class.h b/sql/sql_class.h index 44163333425..d8fc9303949 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -4072,6 +4072,8 @@ public: bool wsrep_ignore_table; wsrep_gtid_t wsrep_sync_wait_gtid; ulong wsrep_affected_rows; + bool wsrep_replicate_GTID; + bool wsrep_skip_wsrep_GTID; #endif /* WITH_WSREP */ /* Handling of timeouts for commands */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 65a2382f0d0..81b71f71760 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2906,6 +2906,7 @@ mysql_execute_command(THD *thd) #endif case SQLCOM_SHOW_STATUS: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); execute_show_status(thd, all_tables); break; } @@ -2944,21 +2945,21 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_TABLE_STATUS: case SQLCOM_SHOW_OPEN_TABLES: case SQLCOM_SHOW_GENERIC: + case SQLCOM_SHOW_PLUGINS: case SQLCOM_SHOW_FIELDS: case SQLCOM_SHOW_KEYS: - case SQLCOM_SELECT: - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) - goto error; - /* fall through */ - case SQLCOM_SHOW_PLUGINS: case SQLCOM_SHOW_VARIABLES: case SQLCOM_SHOW_CHARSETS: case SQLCOM_SHOW_COLLATIONS: case SQLCOM_SHOW_STORAGE_ENGINES: case SQLCOM_SHOW_PROFILE: - { + case SQLCOM_SELECT: + { #ifdef WITH_WSREP - DBUG_ASSERT(thd->wsrep_exec_mode != REPL_RECV); + if (lex->sql_command == SQLCOM_SELECT) + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_READ) + else + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW) #endif /* WITH_WSREP */ thd->status_var.last_query_cost= 0.0; @@ -3082,6 +3083,7 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_RELAYLOG_EVENTS: /* fall through */ case SQLCOM_SHOW_BINLOG_EVENTS: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (check_global_access(thd, REPL_SLAVE_ACL)) goto error; res = mysql_show_binlog_events(thd); @@ -3653,6 +3655,7 @@ end_with_restore_list: { if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL)) goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); res = show_binlogs(thd); break; } @@ -3666,9 +3669,8 @@ end_with_restore_list: MYF(0)); /* purecov: inspected */ goto error; #else - - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) - goto error; + { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); /* Access check: @@ -3732,8 +3734,7 @@ end_with_restore_list: case SQLCOM_CHECKSUM: { DBUG_ASSERT(first_table == all_tables && first_table != 0); - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) - goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_READ); if (check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE)) @@ -3744,6 +3745,7 @@ end_with_restore_list: } case SQLCOM_UPDATE: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE); ha_rows found= 0, updated= 0; DBUG_ASSERT(first_table == all_tables && first_table != 0); if (WSREP_CLIENT(thd) && @@ -3786,9 +3788,7 @@ end_with_restore_list: /* if we switched from normal update, rights are checked */ if (up_result != 2) { - if (WSREP_CLIENT(thd) && - wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) - goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE); if ((res= multi_update_precheck(thd, all_tables))) break; } @@ -3858,10 +3858,6 @@ end_with_restore_list: break; } case SQLCOM_REPLACE: - { - if (WSREP_CLIENT(thd) && - wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE)) - goto error; #ifndef DBUG_OFF if (mysql_bin_log.is_open()) { @@ -3900,6 +3896,7 @@ end_with_restore_list: /* fall through */ case SQLCOM_INSERT: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE); DBUG_ASSERT(first_table == all_tables && first_table != 0); if (WSREP_CLIENT(thd) && @@ -3957,6 +3954,7 @@ end_with_restore_list: case SQLCOM_REPLACE_SELECT: case SQLCOM_INSERT_SELECT: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE); select_result *sel_result; bool explain= MY_TEST(lex->describe); DBUG_ASSERT(first_table == all_tables && first_table != 0); @@ -4079,6 +4077,7 @@ end_with_restore_list: } case SQLCOM_DELETE: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE); select_result *sel_result=lex->result; DBUG_ASSERT(first_table == all_tables && first_table != 0); if (WSREP_CLIENT(thd) && @@ -4139,6 +4138,7 @@ end_with_restore_list: } case SQLCOM_DELETE_MULTI: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE); DBUG_ASSERT(first_table == all_tables && first_table != 0); TABLE_LIST *aux_tables= thd->lex->auxiliary_table_list.first; multi_delete *result; @@ -4219,19 +4219,6 @@ end_with_restore_list: /* So that DROP TEMPORARY TABLE gets to binlog at commit/rollback */ thd->variables.option_bits|= OPTION_KEEP_LOG; } - if (WSREP(thd)) - { - for (TABLE_LIST *table= all_tables; table; table= table->next_global) - { - if (!lex->tmp_table() && - (!thd->is_current_stmt_binlog_format_row() || - !find_temporary_table(thd, table))) - { - WSREP_TO_ISOLATION_BEGIN(NULL, NULL, all_tables); - break; - } - } - } /* If we are a slave, we should add IF EXISTS if the query executed on the master without an error. This will help a slave to @@ -4240,7 +4227,21 @@ end_with_restore_list: */ if (thd->slave_thread && !thd->slave_expected_error && slave_ddl_exec_mode_options == SLAVE_EXEC_MODE_IDEMPOTENT) - lex->create_info.set(DDL_options_st::OPT_IF_EXISTS); + lex->check_exists= 1; + + if (WSREP(thd)) + { + for (TABLE_LIST *table= all_tables; table; table= table->next_global) + { + if (!lex->tmp_table() && + (!thd->is_current_stmt_binlog_format_row() || + !find_temporary_table(thd, table))) + { + WSREP_TO_ISOLATION_BEGIN(NULL, NULL, all_tables); + break; + } + } + } /* DDL and binlog write order are protected by metadata locks. */ res= mysql_rm_table(thd, first_table, lex->if_exists(), lex->tmp_table()); @@ -4546,9 +4547,7 @@ end_with_restore_list: db_name.length= lex->name.length; strmov(db_name.str, lex->name.str); -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (check_db_name(&db_name)) { @@ -4603,9 +4602,7 @@ end_with_restore_list: /* lex->unit.cleanup() is called outside, no need to call it here */ break; case SQLCOM_SHOW_CREATE_EVENT: -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); res= Events::show_create_event(thd, lex->spname->m_db, lex->spname->m_name); break; @@ -4975,6 +4972,7 @@ end_with_restore_list: if (!grant_user) goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); res = mysql_show_grants(thd, grant_user); break; } @@ -5453,18 +5451,14 @@ end_with_restore_list: } case SQLCOM_SHOW_CREATE_PROC: { -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (sp_show_create_routine(thd, TYPE_ENUM_PROCEDURE, lex->spname)) goto error; break; } case SQLCOM_SHOW_CREATE_FUNC: { -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (sp_show_create_routine(thd, TYPE_ENUM_FUNCTION, lex->spname)) goto error; break; @@ -5477,9 +5471,7 @@ end_with_restore_list: stored_procedure_type type= (lex->sql_command == SQLCOM_SHOW_PROC_CODE ? TYPE_ENUM_PROCEDURE : TYPE_ENUM_FUNCTION); -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (sp_cache_routine(thd, type, lex->spname, FALSE, &sp)) goto error; if (!sp || sp->show_routine_code(thd)) @@ -5501,9 +5493,7 @@ end_with_restore_list: if (check_ident_length(&lex->spname->m_name)) goto error; -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (show_create_trigger(thd, lex->spname)) goto error; /* Error has been already logged. */ @@ -5955,6 +5945,7 @@ static bool execute_show_status(THD *thd, TABLE_LIST *all_tables) if (!(res= check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE))) res= execute_sqlcom_select(thd, all_tables); + /* Don't log SHOW STATUS commands to slow query log */ thd->server_status&= ~(SERVER_QUERY_NO_INDEX_USED | SERVER_QUERY_NO_GOOD_INDEX_USED); diff --git a/sql/sql_partition_admin.cc b/sql/sql_partition_admin.cc index a9f179e3a10..e7f5e3cb59e 100644 --- a/sql/sql_partition_admin.cc +++ b/sql/sql_partition_admin.cc @@ -540,7 +540,7 @@ bool Sql_cmd_alter_table_exchange_partition:: if ((!thd->is_current_stmt_binlog_format_row() || /* TODO: Do we really need to check for temp tables in this case? */ !find_temporary_table(thd, table_list)) && - wsrep_to_isolation_begin(thd, table_list->db, table_list->table_name, + wsrep_to_isolation_begin(thd, table_list->db, table_list->table_name, NULL)) { WSREP_WARN("ALTER TABLE EXCHANGE PARTITION isolation failure"); @@ -789,7 +789,7 @@ bool Sql_cmd_alter_table_truncate_partition::execute(THD *thd) if (WSREP(thd) && (!thd->is_current_stmt_binlog_format_row() || !find_temporary_table(thd, first_table)) && wsrep_to_isolation_begin( - thd, first_table->db, first_table->table_name, NULL) + thd, first_table->db, first_table->table_name, NULL) ) { WSREP_WARN("ALTER TABLE TRUNCATE PARTITION isolation failure"); diff --git a/sql/sql_table.cc b/sql/sql_table.cc index 29c382f57d6..e4115b27e44 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -2088,6 +2088,7 @@ bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists, DBUG_RETURN(TRUE); my_ok(thd); DBUG_RETURN(FALSE); + } @@ -2134,7 +2135,6 @@ static uint32 comment_length(THD *thd, uint32 comment_pos, return 0; } - /** Execute the drop of a normal or temporary table. @@ -2571,6 +2571,9 @@ err: /* Chop of the last comma */ built_non_trans_tmp_query.chop(); built_non_trans_tmp_query.append(" /* generated by server */"); +#ifdef WITH_WSREP + thd->wsrep_skip_wsrep_GTID = true; +#endif /* WITH_WSREP */ error |= thd->binlog_query(THD::STMT_QUERY_TYPE, built_non_trans_tmp_query.ptr(), built_non_trans_tmp_query.length(), @@ -2583,6 +2586,9 @@ err: /* Chop of the last comma */ built_trans_tmp_query.chop(); built_trans_tmp_query.append(" /* generated by server */"); +#ifdef WITH_WSREP + thd->wsrep_skip_wsrep_GTID = true; +#endif /* WITH_WSREP */ error |= thd->binlog_query(THD::STMT_QUERY_TYPE, built_trans_tmp_query.ptr(), built_trans_tmp_query.length(), @@ -2597,6 +2603,9 @@ err: built_query.append(" /* generated by server */"); int error_code = non_tmp_error ? thd->get_stmt_da()->sql_errno() : 0; +#ifdef WITH_WSREP + thd->wsrep_skip_wsrep_GTID = false; +#endif /* WITH_WSREP */ error |= thd->binlog_query(THD::STMT_QUERY_TYPE, built_query.ptr(), built_query.length(), @@ -2645,6 +2654,9 @@ err: } end: +#ifdef WITH_WSREP + thd->wsrep_skip_wsrep_GTID = false; +#endif /* WITH_WSREP */ DBUG_RETURN(error); } diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index e0b8b522610..8eb3d35a96e 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4817,7 +4817,7 @@ static Sys_var_ulong Sys_wsrep_slave_threads( GLOBAL_VAR(wsrep_slave_threads), CMD_LINE(REQUIRED_ARG), VALID_RANGE(1, 512), DEFAULT(1), BLOCK_SIZE(1), &PLock_wsrep_slave_threads, NOT_IN_BINLOG, - ON_CHECK(wsrep_slave_threads_check), + ON_CHECK(NULL), ON_UPDATE(wsrep_slave_threads_update)); static Sys_var_charptr Sys_wsrep_dbug_option( @@ -4929,21 +4929,13 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK( GLOBAL_VAR(wsrep_certify_nonPK), CMD_LINE(OPT_ARG), DEFAULT(TRUE)); -static bool fix_wsrep_causal_reads(sys_var *self, THD* thd, enum_var_type var_type) -{ - if (var_type == OPT_GLOBAL) - wsrep_causal_reads_update(&global_system_variables); - else - wsrep_causal_reads_update(&thd->variables); - return false; -} static Sys_var_mybool Sys_wsrep_causal_reads( "wsrep_causal_reads", "Setting this variable is equivalent " "to setting wsrep_sync_wait READ flag", SESSION_VAR(wsrep_causal_reads), CMD_LINE(OPT_ARG, OPT_WSREP_CAUSAL_READS), DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), - ON_UPDATE(fix_wsrep_causal_reads), + ON_UPDATE(wsrep_causal_reads_update), DEPRECATED("'@@wsrep_sync_wait=1'")); static Sys_var_uint Sys_wsrep_sync_wait( diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index e10c19287c4..e5c95780df5 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -218,12 +218,15 @@ wsrep_cb_status_t wsrep_apply_cb(void* const ctx, { THD* const thd((THD*)ctx); + assert(thd->wsrep_apply_toi == false); + // Allow tests to block the applier thread using the DBUG facilities. DBUG_EXECUTE_IF("sync.wsrep_apply_cb", { const char act[]= "now " - "wait_for signal.wsrep_apply_cb"; + "SIGNAL sync.wsrep_apply_cb_reached " + "WAIT_FOR signal.wsrep_apply_cb"; DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act))); };); @@ -383,7 +386,7 @@ wsrep_cb_status_t wsrep_commit_cb(void* const ctx, mysql_mutex_unlock(&LOCK_wsrep_slave_threads); } - if (*exit == false && thd->wsrep_applier) + if (thd->wsrep_applier) { /* From trans_begin() */ thd->variables.option_bits|= OPTION_BEGIN; diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index 7e62e820dcd..a8a0d574362 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -42,6 +42,7 @@ void wsrep_cleanup_transaction(THD *thd) thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; thd->wsrep_exec_mode= LOCAL_STATE; thd->wsrep_affected_rows= 0; + thd->wsrep_skip_wsrep_GTID= false; return; } diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 617aebae82e..67b64301ff9 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2,7 +2,7 @@ This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. + the Free Software Foundation; version 2 of the License.x1 This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -16,6 +16,7 @@ #include <mysqld.h> #include <sql_class.h> #include <sql_parse.h> +#include <sql_base.h> /* find_temporary_table() */ #include "slave.h" #include "rpl_mi.h" #include "sql_repl.h" @@ -960,8 +961,6 @@ bool wsrep_must_sync_wait (THD* thd, uint mask) { return (thd->variables.wsrep_sync_wait & mask) && thd->variables.wsrep_on && - !(thd->variables.wsrep_dirty_reads && - !is_update_query(thd->lex->sql_command)) && !thd->in_active_multi_stmt_transaction() && thd->wsrep_conflict_state != REPLAYING && thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED; @@ -1090,85 +1089,70 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, const TABLE_LIST* table_list, wsrep_key_arr_t* ka) { - ka->keys= 0; - ka->keys_len= 0; + ka->keys= 0; + ka->keys_len= 0; - extern TABLE* find_temporary_table(THD*, const TABLE_LIST*); - - if (db || table) + if (db || table) + { + if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) + { + WSREP_ERROR("Can't allocate memory for key_array"); + goto err; + } + ka->keys_len= 1; + if (!(ka->keys[0].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - TABLE_LIST tmp_table; + WSREP_ERROR("Can't allocate memory for key_parts"); + goto err; + } + ka->keys[0].key_parts_num= 2; + if (!wsrep_prepare_key_for_isolation( + db, table, + (wsrep_buf_t*)ka->keys[0].key_parts, + &ka->keys[0].key_parts_num)) + { + WSREP_ERROR("Preparing keys for isolation failed (1)"); + goto err; + } + } - memset(&tmp_table, 0, sizeof(tmp_table)); - tmp_table.table_name= (char*)table; - tmp_table.db= (char*)db; - tmp_table.mdl_request.init(MDL_key::GLOBAL, (db) ? db : "", - (table) ? table : "", - MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT); + for (const TABLE_LIST* table= table_list; table; table= table->next_global) + { + wsrep_key_t* tmp; + if (ka->keys) + tmp= (wsrep_key_t*)my_realloc(ka->keys, + (ka->keys_len + 1) * sizeof(wsrep_key_t), + MYF(0)); + else + tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0)); - if (!table || !find_temporary_table(thd, &tmp_table)) - { - if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) - { - WSREP_ERROR("Can't allocate memory for key_array"); - goto err; - } - ka->keys_len= 1; - if (!(ka->keys[0].key_parts= (wsrep_buf_t*) - my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) - { - WSREP_ERROR("Can't allocate memory for key_parts"); - goto err; - } - ka->keys[0].key_parts_num= 2; - if (!wsrep_prepare_key_for_isolation( - db, table, - (wsrep_buf_t*)ka->keys[0].key_parts, - &ka->keys[0].key_parts_num)) - { - WSREP_ERROR("Preparing keys for isolation failed"); - goto err; - } - } + if (!tmp) + { + WSREP_ERROR("Can't allocate memory for key_array"); + goto err; } - - for (const TABLE_LIST* table= table_list; table; table= table->next_global) + ka->keys= tmp; + if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - if (!find_temporary_table(thd, table)) - { - wsrep_key_t* tmp; - tmp= (wsrep_key_t*)my_realloc( - ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), - MYF(MY_ALLOW_ZERO_PTR)); - - if (!tmp) - { - WSREP_ERROR("Can't allocate memory for key_array"); - goto err; - } - ka->keys= tmp; - if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) - my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) - { - WSREP_ERROR("Can't allocate memory for key_parts"); - goto err; - } - ka->keys[ka->keys_len].key_parts_num= 2; - ++ka->keys_len; - if (!wsrep_prepare_key_for_isolation( - table->db, table->table_name, - (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, - &ka->keys[ka->keys_len - 1].key_parts_num)) - { - WSREP_ERROR("Preparing keys for isolation failed"); - goto err; - } - } + WSREP_ERROR("Can't allocate memory for key_parts"); + goto err; } - return true; + ka->keys[ka->keys_len].key_parts_num= 2; + ++ka->keys_len; + if (!wsrep_prepare_key_for_isolation(table->db, table->table_name, + (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, + &ka->keys[ka->keys_len - 1].key_parts_num)) + { + WSREP_ERROR("Preparing keys for isolation failed (2)"); + goto err; + } + } + return 0; err: wsrep_keys_free(ka); - return false; + return 1; } @@ -1379,6 +1363,142 @@ static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len); static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); /* + Rewrite DROP TABLE for TOI. Temporary tables are eliminated from + the query as they are visible only to client connection. + + TODO: See comments for sql_base.cc:drop_temporary_table() and refine + the function to deal with transactional locked tables. + */ +static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len) +{ + + LEX* lex= thd->lex; + SELECT_LEX* select_lex= &lex->select_lex; + TABLE_LIST* first_table= select_lex->table_list.first; + String buff; + + bool found_temp_table= false; + for (TABLE_LIST* table= first_table; table; table= table->next_global) + { + if (find_temporary_table(thd, table->db, table->table_name)) + { + found_temp_table= true; + break; + } + } + + if (found_temp_table) + { + buff.append("DROP TABLE "); + if (lex->check_exists) + buff.append("IF EXISTS "); + + for (TABLE_LIST* table= first_table; table; table= table->next_global) + { + if (!find_temporary_table(thd, table->db, table->table_name)) + { + append_identifier(thd, &buff, table->db, strlen(table->db)); + buff.append("."); + append_identifier(thd, &buff, table->table_name, + strlen(table->table_name)); + buff.append(","); + } + } + + /* Chop the last comma */ + buff.chop(); + buff.append(" /* generated by wsrep */"); + + WSREP_DEBUG("Rewrote '%s' as '%s'", thd->query(), buff.ptr()); + + return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); + } + else + { + return wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), + buf, buf_len); + } +} + +/* + Decide if statement should run in TOI. + + Look if table or table_list contain temporary tables. If the + statement affects only temporary tables, statement should not run + in TOI. If the table list contains mix of regular and temporary tables + (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but + should be rewritten at later time for replication to contain only + non-temporary tables. + */ +static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table, + const TABLE_LIST *table_list) +{ + DBUG_ASSERT(!table || db); + DBUG_ASSERT(table_list || db); + + LEX* lex= thd->lex; + SELECT_LEX* select_lex= &lex->select_lex; + TABLE_LIST* first_table= select_lex->table_list.first; + + switch (lex->sql_command) + { + case SQLCOM_CREATE_TABLE: + DBUG_ASSERT(!table_list); + if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) + { + return false; + } + return true; + + case SQLCOM_CREATE_VIEW: + + DBUG_ASSERT(!table_list); + DBUG_ASSERT(first_table); /* First table is view name */ + /* + If any of the remaining tables refer to temporary table error + is returned to client, so TOI can be skipped + */ + for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global) + { + if (find_temporary_table(thd, it)) + { + return false; + } + } + return true; + + case SQLCOM_CREATE_TRIGGER: + + DBUG_ASSERT(!table_list); + DBUG_ASSERT(first_table); + + if (find_temporary_table(thd, first_table)) + { + return false; + } + return true; + + default: + if (table && !find_temporary_table(thd, db, table)) + { + return true; + } + + if (table_list) + { + for (TABLE_LIST* table= first_table; table; table= table->next_global) + { + if (!find_temporary_table(thd, table->db, table->table_name)) + { + return true; + } + } + } + return !(table || table_list); + } +} + +/* returns: 0: statement was replicated as TOI 1: TOI replication was skipped @@ -1393,6 +1513,12 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, int buf_err; int rc= 0; + if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false) + { + WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd)); + return 1; + } + WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); switch (thd->lex->sql_command) @@ -1420,16 +1546,16 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, } /* fallthrough */ default: - buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), &buf, - &buf_len); + buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), + &buf, &buf_len); break; } wsrep_key_arr_t key_arr= {0, 0}; struct wsrep_buf buff = { buf, buf_len }; - if (!buf_err && - wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& - key_arr.keys_len > 0 && + if (!buf_err && + !wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr) && + key_arr.keys_len > 0 && WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, key_arr.keys, key_arr.keys_len, &buff, 1, @@ -1626,9 +1752,12 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) { switch (thd->variables.wsrep_OSU_method) { - case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_, - table_list); break; - case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break; + case WSREP_OSU_TOI: + ret = wsrep_TOI_begin(thd, db_, table_, table_list); + break; + case WSREP_OSU_RSU: + ret = wsrep_RSU_begin(thd, db_, table_); + break; default: WSREP_ERROR("Unsupported OSU method: %lu", thd->variables.wsrep_OSU_method); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index f02a3cd72f3..fd68fab991c 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -100,11 +100,12 @@ enum enum_wsrep_OSU_method { enum enum_wsrep_sync_wait { WSREP_SYNC_WAIT_NONE = 0x0, - // show, select, begin + // select, begin WSREP_SYNC_WAIT_BEFORE_READ = 0x1, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE = 0x2, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE = 0x4, - WSREP_SYNC_WAIT_MAX = 0x7 + WSREP_SYNC_WAIT_BEFORE_SHOW = 0x8, + WSREP_SYNC_WAIT_MAX = 0xF }; // MySQL status variables @@ -224,6 +225,8 @@ extern wsrep_seqno_t wsrep_locked_seqno; #define WSREP_PROVIDER_EXISTS \ (wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN)) +#define WSREP_QUERY(thd) (thd->query()) + extern void wsrep_ready_wait(); class Ha_trx_info; diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index 7f9a88c3aa6..a187c75baa3 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -697,7 +697,7 @@ ssize_t wsrep_sst_prepare (void** msg) // Attempt 1: wsrep_sst_receive_address if (wsrep_sst_receive_address && - strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) + strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) { addr_in= wsrep_sst_receive_address; } @@ -837,16 +837,13 @@ static int sst_donate_mysqldump (const char* addr, { char host[256]; wsp::Address address(addr); - if (!address.is_valid()) { WSREP_ERROR("Could not parse SST address : %s", addr); return 0; } - memcpy(host, address.get_address(), address.get_address_len()); int port= address.get_port(); - int const cmd_len= 4096; wsp::string cmd_str(cmd_len); @@ -863,7 +860,7 @@ static int sst_donate_mysqldump (const char* addr, int ret= snprintf (cmd_str(), cmd_len, "wsrep_sst_mysqldump " - WSREP_SST_OPT_HOST" '%s' " + WSREP_SST_OPT_ADDR" '%s' " WSREP_SST_OPT_PORT" '%d' " WSREP_SST_OPT_LPORT" '%u' " WSREP_SST_OPT_SOCKET" '%s' " @@ -871,7 +868,7 @@ static int sst_donate_mysqldump (const char* addr, WSREP_SST_OPT_GTID" '%s:%lld' " WSREP_SST_OPT_GTID_DOMAIN_ID" '%d'" "%s", - host, port, mysqld_port, mysqld_unix_port, + addr, port, mysqld_port, mysqld_unix_port, wsrep_defaults_file, uuid_str, (long long)seqno, wsrep_gtid_domain_id, bypass ? " " WSREP_SST_OPT_BYPASS : ""); diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index a0111354d02..580c8bbd55c 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -574,3 +574,17 @@ done: return ret; } +/* returns the length of the host part of the address string */ +size_t wsrep_host_len(const char* const addr, size_t const addr_len) +{ + // check for IPv6 notation first + const char* const bracket= ('[' == addr[0] ? strchr(addr, ']') : NULL); + + if (bracket) { // IPv6 + return (bracket - addr + 1); + } + else { // host part ends at ':' or end of string + const char* const colon= strchr(addr, ':'); + return (colon ? colon - addr : addr_len); + } +} diff --git a/sql/wsrep_utils.h b/sql/wsrep_utils.h index f20e02d03a2..88a4c1e1a70 100644 --- a/sql/wsrep_utils.h +++ b/sql/wsrep_utils.h @@ -22,6 +22,9 @@ unsigned int wsrep_check_ip (const char* const addr, bool *is_ipv6); size_t wsrep_guess_ip (char* buf, size_t buf_len); +/* returns the length of the host part of the address string */ +size_t wsrep_host_len(const char* addr, size_t addr_len); + namespace wsp { class Address { diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index 158d0ca847d..1d117766db4 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -35,6 +35,8 @@ const char* wsrep_node_address = 0; const char* wsrep_node_incoming_address = 0; const char* wsrep_start_position = 0; +static long wsrep_prev_slave_threads = wsrep_slave_threads; + int wsrep_init_vars() { wsrep_provider = my_strdup(WSREP_NONE, MYF(MY_WME)); @@ -59,12 +61,23 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type) return false; } -bool wsrep_causal_reads_update (SV *sv) +bool wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) { - if (sv->wsrep_causal_reads) { - sv->wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; + // global setting should not affect session setting. + // if (var_type == OPT_GLOBAL) { + // thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; + // } + if (thd->variables.wsrep_causal_reads) { + thd->variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; + } else { + thd->variables.wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ; + } + + // update global settings too. + if (global_system_variables.wsrep_causal_reads) { + global_system_variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; } else { - sv->wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ; + global_system_variables.wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ; } return false; @@ -72,12 +85,17 @@ bool wsrep_causal_reads_update (SV *sv) bool wsrep_sync_wait_update (sys_var* self, THD* thd, enum_var_type var_type) { - if (var_type == OPT_GLOBAL) - global_system_variables.wsrep_causal_reads = - MY_TEST(global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ); - else - thd->variables.wsrep_causal_reads = - MY_TEST(thd->variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ); + // global setting should not affect session setting. + // if (var_type == OPT_GLOBAL) { + // thd->variables.wsrep_sync_wait = global_system_variables.wsrep_sync_wait; + // } + thd->variables.wsrep_causal_reads = thd->variables.wsrep_sync_wait & + WSREP_SYNC_WAIT_BEFORE_READ; + + // update global settings too + global_system_variables.wsrep_causal_reads = global_system_variables.wsrep_sync_wait & + WSREP_SYNC_WAIT_BEFORE_READ; + return false; } @@ -502,18 +520,15 @@ void wsrep_node_address_init (const char* value) wsrep_node_address = (value) ? my_strdup(value, MYF(0)) : NULL; } -bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) +static void wsrep_slave_count_change_update () { - mysql_mutex_lock(&LOCK_wsrep_slave_threads); - wsrep_slave_count_change += (var->save_result.ulonglong_value - - wsrep_slave_threads); - mysql_mutex_unlock(&LOCK_wsrep_slave_threads); - - return 0; + wsrep_slave_count_change += (wsrep_slave_threads - wsrep_prev_slave_threads); + wsrep_prev_slave_threads = wsrep_slave_threads; } bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) { + wsrep_slave_count_change_update(); if (wsrep_slave_count_change > 0) { wsrep_create_appliers(wsrep_slave_count_change); diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h index ca77b5c0039..7530fd98870 100644 --- a/sql/wsrep_var.h +++ b/sql/wsrep_var.h @@ -41,9 +41,7 @@ int wsrep_init_vars(); #define DEFAULT_ARGS (THD* thd, enum_var_type var_type) #define INIT_ARGS (const char* opt) -struct system_variables; -bool wsrep_causal_reads_update(struct system_variables *sv); - +extern bool wsrep_causal_reads_update UPDATE_ARGS; extern bool wsrep_on_update UPDATE_ARGS; extern bool wsrep_sync_wait_update UPDATE_ARGS; extern bool wsrep_start_position_check CHECK_ARGS; |