summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/handler.cc5
-rw-r--r--sql/mysqld.cc23
-rw-r--r--sql/sql_class.cc4
-rw-r--r--sql/sql_class.h2
-rw-r--r--sql/sql_parse.cc93
-rw-r--r--sql/sql_partition_admin.cc4
-rw-r--r--sql/sql_table.cc14
-rw-r--r--sql/sys_vars.cc12
-rw-r--r--sql/wsrep_applier.cc7
-rw-r--r--sql/wsrep_hton.cc1
-rw-r--r--sql/wsrep_mysqld.cc291
-rw-r--r--sql/wsrep_mysqld.h7
-rw-r--r--sql/wsrep_sst.cc9
-rw-r--r--sql/wsrep_utils.cc14
-rw-r--r--sql/wsrep_utils.h3
-rw-r--r--sql/wsrep_var.cc49
-rw-r--r--sql/wsrep_var.h4
17 files changed, 366 insertions, 176 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index dae47668410..49e9626cb81 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1660,6 +1660,11 @@ int ha_rollback_trans(THD *thd, bool all)
{ // cannot happen
my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
error=1;
+#ifdef WITH_WSREP
+ WSREP_WARN("handlerton rollback failed, thd %lu %lld conf %d SQL %s",
+ thd->thread_id, thd->query_id, thd->wsrep_conflict_state,
+ thd->query());
+#endif /* WITH_WSREP */
}
status_var_increment(thd->status_var.ha_rollback_count);
ha_info_next= ha_info->next();
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 3af8750de13..97d63d99458 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -9226,8 +9226,29 @@ mysqld_get_one_option(int optid, const struct my_option *opt, char *argument)
}
#ifdef WITH_WSREP
case OPT_WSREP_CAUSAL_READS:
- wsrep_causal_reads_update(&global_system_variables);
+ {
+ if (global_system_variables.wsrep_causal_reads)
+ {
+ WSREP_WARN("option --wsrep-causal-reads is deprecated");
+ if (!(global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ))
+ {
+ WSREP_WARN("--wsrep-causal-reads=ON takes precedence over --wsrep-sync-wait=%u. "
+ "WSREP_SYNC_WAIT_BEFORE_READ is on",
+ global_system_variables.wsrep_sync_wait);
+ global_system_variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ;
+ }
+ }
+ else
+ {
+ if (global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ) {
+ WSREP_WARN("--wsrep-sync-wait=%u takes precedence over --wsrep-causal-reads=OFF. "
+ "WSREP_SYNC_WAIT_BEFORE_READ is on",
+ global_system_variables.wsrep_sync_wait);
+ global_system_variables.wsrep_causal_reads = 1;
+ }
+ }
break;
+ }
case OPT_WSREP_SYNC_WAIT:
global_system_variables.wsrep_causal_reads=
MY_TEST(global_system_variables.wsrep_sync_wait &
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 4df257b96a4..ce1f69b76a9 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1023,6 +1023,8 @@ THD::THD(bool is_wsrep_applier)
wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */
wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED;
wsrep_affected_rows = 0;
+ wsrep_replicate_GTID = false;
+ wsrep_skip_wsrep_GTID = false;
#endif
/* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this);
@@ -1439,6 +1441,8 @@ void THD::init(void)
wsrep_TOI_pre_query_len = 0;
wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED;
wsrep_affected_rows = 0;
+ wsrep_replicate_GTID = false;
+ wsrep_skip_wsrep_GTID = false;
#endif /* WITH_WSREP */
if (variables.sql_log_bin)
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 44163333425..d8fc9303949 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4072,6 +4072,8 @@ public:
bool wsrep_ignore_table;
wsrep_gtid_t wsrep_sync_wait_gtid;
ulong wsrep_affected_rows;
+ bool wsrep_replicate_GTID;
+ bool wsrep_skip_wsrep_GTID;
#endif /* WITH_WSREP */
/* Handling of timeouts for commands */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 65a2382f0d0..81b71f71760 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2906,6 +2906,7 @@ mysql_execute_command(THD *thd)
#endif
case SQLCOM_SHOW_STATUS:
{
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
execute_show_status(thd, all_tables);
break;
}
@@ -2944,21 +2945,21 @@ mysql_execute_command(THD *thd)
case SQLCOM_SHOW_TABLE_STATUS:
case SQLCOM_SHOW_OPEN_TABLES:
case SQLCOM_SHOW_GENERIC:
+ case SQLCOM_SHOW_PLUGINS:
case SQLCOM_SHOW_FIELDS:
case SQLCOM_SHOW_KEYS:
- case SQLCOM_SELECT:
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd))
- goto error;
- /* fall through */
- case SQLCOM_SHOW_PLUGINS:
case SQLCOM_SHOW_VARIABLES:
case SQLCOM_SHOW_CHARSETS:
case SQLCOM_SHOW_COLLATIONS:
case SQLCOM_SHOW_STORAGE_ENGINES:
case SQLCOM_SHOW_PROFILE:
- {
+ case SQLCOM_SELECT:
+ {
#ifdef WITH_WSREP
- DBUG_ASSERT(thd->wsrep_exec_mode != REPL_RECV);
+ if (lex->sql_command == SQLCOM_SELECT)
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_READ)
+ else
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW)
#endif /* WITH_WSREP */
thd->status_var.last_query_cost= 0.0;
@@ -3082,6 +3083,7 @@ mysql_execute_command(THD *thd)
case SQLCOM_SHOW_RELAYLOG_EVENTS: /* fall through */
case SQLCOM_SHOW_BINLOG_EVENTS:
{
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
if (check_global_access(thd, REPL_SLAVE_ACL))
goto error;
res = mysql_show_binlog_events(thd);
@@ -3653,6 +3655,7 @@ end_with_restore_list:
{
if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL))
goto error;
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
res = show_binlogs(thd);
break;
}
@@ -3666,9 +3669,8 @@ end_with_restore_list:
MYF(0)); /* purecov: inspected */
goto error;
#else
-
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd))
- goto error;
+ {
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
/*
Access check:
@@ -3732,8 +3734,7 @@ end_with_restore_list:
case SQLCOM_CHECKSUM:
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd))
- goto error;
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_READ);
if (check_table_access(thd, SELECT_ACL, all_tables,
FALSE, UINT_MAX, FALSE))
@@ -3744,6 +3745,7 @@ end_with_restore_list:
}
case SQLCOM_UPDATE:
{
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE);
ha_rows found= 0, updated= 0;
DBUG_ASSERT(first_table == all_tables && first_table != 0);
if (WSREP_CLIENT(thd) &&
@@ -3786,9 +3788,7 @@ end_with_restore_list:
/* if we switched from normal update, rights are checked */
if (up_result != 2)
{
- if (WSREP_CLIENT(thd) &&
- wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE))
- goto error;
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE);
if ((res= multi_update_precheck(thd, all_tables)))
break;
}
@@ -3858,10 +3858,6 @@ end_with_restore_list:
break;
}
case SQLCOM_REPLACE:
- {
- if (WSREP_CLIENT(thd) &&
- wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE))
- goto error;
#ifndef DBUG_OFF
if (mysql_bin_log.is_open())
{
@@ -3900,6 +3896,7 @@ end_with_restore_list:
/* fall through */
case SQLCOM_INSERT:
{
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE);
DBUG_ASSERT(first_table == all_tables && first_table != 0);
if (WSREP_CLIENT(thd) &&
@@ -3957,6 +3954,7 @@ end_with_restore_list:
case SQLCOM_REPLACE_SELECT:
case SQLCOM_INSERT_SELECT:
{
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE);
select_result *sel_result;
bool explain= MY_TEST(lex->describe);
DBUG_ASSERT(first_table == all_tables && first_table != 0);
@@ -4079,6 +4077,7 @@ end_with_restore_list:
}
case SQLCOM_DELETE:
{
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE);
select_result *sel_result=lex->result;
DBUG_ASSERT(first_table == all_tables && first_table != 0);
if (WSREP_CLIENT(thd) &&
@@ -4139,6 +4138,7 @@ end_with_restore_list:
}
case SQLCOM_DELETE_MULTI:
{
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE);
DBUG_ASSERT(first_table == all_tables && first_table != 0);
TABLE_LIST *aux_tables= thd->lex->auxiliary_table_list.first;
multi_delete *result;
@@ -4219,19 +4219,6 @@ end_with_restore_list:
/* So that DROP TEMPORARY TABLE gets to binlog at commit/rollback */
thd->variables.option_bits|= OPTION_KEEP_LOG;
}
- if (WSREP(thd))
- {
- for (TABLE_LIST *table= all_tables; table; table= table->next_global)
- {
- if (!lex->tmp_table() &&
- (!thd->is_current_stmt_binlog_format_row() ||
- !find_temporary_table(thd, table)))
- {
- WSREP_TO_ISOLATION_BEGIN(NULL, NULL, all_tables);
- break;
- }
- }
- }
/*
If we are a slave, we should add IF EXISTS if the query executed
on the master without an error. This will help a slave to
@@ -4240,7 +4227,21 @@ end_with_restore_list:
*/
if (thd->slave_thread && !thd->slave_expected_error &&
slave_ddl_exec_mode_options == SLAVE_EXEC_MODE_IDEMPOTENT)
- lex->create_info.set(DDL_options_st::OPT_IF_EXISTS);
+ lex->check_exists= 1;
+
+ if (WSREP(thd))
+ {
+ for (TABLE_LIST *table= all_tables; table; table= table->next_global)
+ {
+ if (!lex->tmp_table() &&
+ (!thd->is_current_stmt_binlog_format_row() ||
+ !find_temporary_table(thd, table)))
+ {
+ WSREP_TO_ISOLATION_BEGIN(NULL, NULL, all_tables);
+ break;
+ }
+ }
+ }
/* DDL and binlog write order are protected by metadata locks. */
res= mysql_rm_table(thd, first_table, lex->if_exists(), lex->tmp_table());
@@ -4546,9 +4547,7 @@ end_with_restore_list:
db_name.length= lex->name.length;
strmov(db_name.str, lex->name.str);
-#ifdef WITH_WSREP
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
-#endif /* WITH_WSREP */
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
if (check_db_name(&db_name))
{
@@ -4603,9 +4602,7 @@ end_with_restore_list:
/* lex->unit.cleanup() is called outside, no need to call it here */
break;
case SQLCOM_SHOW_CREATE_EVENT:
-#ifdef WITH_WSREP
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
-#endif /* WITH_WSREP */
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
res= Events::show_create_event(thd, lex->spname->m_db,
lex->spname->m_name);
break;
@@ -4975,6 +4972,7 @@ end_with_restore_list:
if (!grant_user)
goto error;
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
res = mysql_show_grants(thd, grant_user);
break;
}
@@ -5453,18 +5451,14 @@ end_with_restore_list:
}
case SQLCOM_SHOW_CREATE_PROC:
{
-#ifdef WITH_WSREP
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
-#endif /* WITH_WSREP */
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
if (sp_show_create_routine(thd, TYPE_ENUM_PROCEDURE, lex->spname))
goto error;
break;
}
case SQLCOM_SHOW_CREATE_FUNC:
{
-#ifdef WITH_WSREP
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
-#endif /* WITH_WSREP */
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
if (sp_show_create_routine(thd, TYPE_ENUM_FUNCTION, lex->spname))
goto error;
break;
@@ -5477,9 +5471,7 @@ end_with_restore_list:
stored_procedure_type type= (lex->sql_command == SQLCOM_SHOW_PROC_CODE ?
TYPE_ENUM_PROCEDURE : TYPE_ENUM_FUNCTION);
-#ifdef WITH_WSREP
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
-#endif /* WITH_WSREP */
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
if (sp_cache_routine(thd, type, lex->spname, FALSE, &sp))
goto error;
if (!sp || sp->show_routine_code(thd))
@@ -5501,9 +5493,7 @@ end_with_restore_list:
if (check_ident_length(&lex->spname->m_name))
goto error;
-#ifdef WITH_WSREP
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error;
-#endif /* WITH_WSREP */
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW);
if (show_create_trigger(thd, lex->spname))
goto error; /* Error has been already logged. */
@@ -5955,6 +5945,7 @@ static bool execute_show_status(THD *thd, TABLE_LIST *all_tables)
if (!(res= check_table_access(thd, SELECT_ACL, all_tables, FALSE,
UINT_MAX, FALSE)))
res= execute_sqlcom_select(thd, all_tables);
+
/* Don't log SHOW STATUS commands to slow query log */
thd->server_status&= ~(SERVER_QUERY_NO_INDEX_USED |
SERVER_QUERY_NO_GOOD_INDEX_USED);
diff --git a/sql/sql_partition_admin.cc b/sql/sql_partition_admin.cc
index a9f179e3a10..e7f5e3cb59e 100644
--- a/sql/sql_partition_admin.cc
+++ b/sql/sql_partition_admin.cc
@@ -540,7 +540,7 @@ bool Sql_cmd_alter_table_exchange_partition::
if ((!thd->is_current_stmt_binlog_format_row() ||
/* TODO: Do we really need to check for temp tables in this case? */
!find_temporary_table(thd, table_list)) &&
- wsrep_to_isolation_begin(thd, table_list->db, table_list->table_name,
+ wsrep_to_isolation_begin(thd, table_list->db, table_list->table_name,
NULL))
{
WSREP_WARN("ALTER TABLE EXCHANGE PARTITION isolation failure");
@@ -789,7 +789,7 @@ bool Sql_cmd_alter_table_truncate_partition::execute(THD *thd)
if (WSREP(thd) && (!thd->is_current_stmt_binlog_format_row() ||
!find_temporary_table(thd, first_table)) &&
wsrep_to_isolation_begin(
- thd, first_table->db, first_table->table_name, NULL)
+ thd, first_table->db, first_table->table_name, NULL)
)
{
WSREP_WARN("ALTER TABLE TRUNCATE PARTITION isolation failure");
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 29c382f57d6..e4115b27e44 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -2088,6 +2088,7 @@ bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists,
DBUG_RETURN(TRUE);
my_ok(thd);
DBUG_RETURN(FALSE);
+
}
@@ -2134,7 +2135,6 @@ static uint32 comment_length(THD *thd, uint32 comment_pos,
return 0;
}
-
/**
Execute the drop of a normal or temporary table.
@@ -2571,6 +2571,9 @@ err:
/* Chop of the last comma */
built_non_trans_tmp_query.chop();
built_non_trans_tmp_query.append(" /* generated by server */");
+#ifdef WITH_WSREP
+ thd->wsrep_skip_wsrep_GTID = true;
+#endif /* WITH_WSREP */
error |= thd->binlog_query(THD::STMT_QUERY_TYPE,
built_non_trans_tmp_query.ptr(),
built_non_trans_tmp_query.length(),
@@ -2583,6 +2586,9 @@ err:
/* Chop of the last comma */
built_trans_tmp_query.chop();
built_trans_tmp_query.append(" /* generated by server */");
+#ifdef WITH_WSREP
+ thd->wsrep_skip_wsrep_GTID = true;
+#endif /* WITH_WSREP */
error |= thd->binlog_query(THD::STMT_QUERY_TYPE,
built_trans_tmp_query.ptr(),
built_trans_tmp_query.length(),
@@ -2597,6 +2603,9 @@ err:
built_query.append(" /* generated by server */");
int error_code = non_tmp_error ? thd->get_stmt_da()->sql_errno()
: 0;
+#ifdef WITH_WSREP
+ thd->wsrep_skip_wsrep_GTID = false;
+#endif /* WITH_WSREP */
error |= thd->binlog_query(THD::STMT_QUERY_TYPE,
built_query.ptr(),
built_query.length(),
@@ -2645,6 +2654,9 @@ err:
}
end:
+#ifdef WITH_WSREP
+ thd->wsrep_skip_wsrep_GTID = false;
+#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index e0b8b522610..8eb3d35a96e 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -4817,7 +4817,7 @@ static Sys_var_ulong Sys_wsrep_slave_threads(
GLOBAL_VAR(wsrep_slave_threads), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, 512), DEFAULT(1), BLOCK_SIZE(1),
&PLock_wsrep_slave_threads, NOT_IN_BINLOG,
- ON_CHECK(wsrep_slave_threads_check),
+ ON_CHECK(NULL),
ON_UPDATE(wsrep_slave_threads_update));
static Sys_var_charptr Sys_wsrep_dbug_option(
@@ -4929,21 +4929,13 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK(
GLOBAL_VAR(wsrep_certify_nonPK),
CMD_LINE(OPT_ARG), DEFAULT(TRUE));
-static bool fix_wsrep_causal_reads(sys_var *self, THD* thd, enum_var_type var_type)
-{
- if (var_type == OPT_GLOBAL)
- wsrep_causal_reads_update(&global_system_variables);
- else
- wsrep_causal_reads_update(&thd->variables);
- return false;
-}
static Sys_var_mybool Sys_wsrep_causal_reads(
"wsrep_causal_reads", "Setting this variable is equivalent "
"to setting wsrep_sync_wait READ flag",
SESSION_VAR(wsrep_causal_reads),
CMD_LINE(OPT_ARG, OPT_WSREP_CAUSAL_READS), DEFAULT(FALSE),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
- ON_UPDATE(fix_wsrep_causal_reads),
+ ON_UPDATE(wsrep_causal_reads_update),
DEPRECATED("'@@wsrep_sync_wait=1'"));
static Sys_var_uint Sys_wsrep_sync_wait(
diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc
index e10c19287c4..e5c95780df5 100644
--- a/sql/wsrep_applier.cc
+++ b/sql/wsrep_applier.cc
@@ -218,12 +218,15 @@ wsrep_cb_status_t wsrep_apply_cb(void* const ctx,
{
THD* const thd((THD*)ctx);
+ assert(thd->wsrep_apply_toi == false);
+
// Allow tests to block the applier thread using the DBUG facilities.
DBUG_EXECUTE_IF("sync.wsrep_apply_cb",
{
const char act[]=
"now "
- "wait_for signal.wsrep_apply_cb";
+ "SIGNAL sync.wsrep_apply_cb_reached "
+ "WAIT_FOR signal.wsrep_apply_cb";
DBUG_ASSERT(!debug_sync_set_action(thd,
STRING_WITH_LEN(act)));
};);
@@ -383,7 +386,7 @@ wsrep_cb_status_t wsrep_commit_cb(void* const ctx,
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
}
- if (*exit == false && thd->wsrep_applier)
+ if (thd->wsrep_applier)
{
/* From trans_begin() */
thd->variables.option_bits|= OPTION_BEGIN;
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc
index 7e62e820dcd..a8a0d574362 100644
--- a/sql/wsrep_hton.cc
+++ b/sql/wsrep_hton.cc
@@ -42,6 +42,7 @@ void wsrep_cleanup_transaction(THD *thd)
thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED;
thd->wsrep_exec_mode= LOCAL_STATE;
thd->wsrep_affected_rows= 0;
+ thd->wsrep_skip_wsrep_GTID= false;
return;
}
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 617aebae82e..67b64301ff9 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -2,7 +2,7 @@
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
+ the Free Software Foundation; version 2 of the License.x1
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -16,6 +16,7 @@
#include <mysqld.h>
#include <sql_class.h>
#include <sql_parse.h>
+#include <sql_base.h> /* find_temporary_table() */
#include "slave.h"
#include "rpl_mi.h"
#include "sql_repl.h"
@@ -960,8 +961,6 @@ bool wsrep_must_sync_wait (THD* thd, uint mask)
{
return (thd->variables.wsrep_sync_wait & mask) &&
thd->variables.wsrep_on &&
- !(thd->variables.wsrep_dirty_reads &&
- !is_update_query(thd->lex->sql_command)) &&
!thd->in_active_multi_stmt_transaction() &&
thd->wsrep_conflict_state != REPLAYING &&
thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED;
@@ -1090,85 +1089,70 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
const TABLE_LIST* table_list,
wsrep_key_arr_t* ka)
{
- ka->keys= 0;
- ka->keys_len= 0;
+ ka->keys= 0;
+ ka->keys_len= 0;
- extern TABLE* find_temporary_table(THD*, const TABLE_LIST*);
-
- if (db || table)
+ if (db || table)
+ {
+ if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
+ {
+ WSREP_ERROR("Can't allocate memory for key_array");
+ goto err;
+ }
+ ka->keys_len= 1;
+ if (!(ka->keys[0].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{
- TABLE_LIST tmp_table;
+ WSREP_ERROR("Can't allocate memory for key_parts");
+ goto err;
+ }
+ ka->keys[0].key_parts_num= 2;
+ if (!wsrep_prepare_key_for_isolation(
+ db, table,
+ (wsrep_buf_t*)ka->keys[0].key_parts,
+ &ka->keys[0].key_parts_num))
+ {
+ WSREP_ERROR("Preparing keys for isolation failed (1)");
+ goto err;
+ }
+ }
- memset(&tmp_table, 0, sizeof(tmp_table));
- tmp_table.table_name= (char*)table;
- tmp_table.db= (char*)db;
- tmp_table.mdl_request.init(MDL_key::GLOBAL, (db) ? db : "",
- (table) ? table : "",
- MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT);
+ for (const TABLE_LIST* table= table_list; table; table= table->next_global)
+ {
+ wsrep_key_t* tmp;
+ if (ka->keys)
+ tmp= (wsrep_key_t*)my_realloc(ka->keys,
+ (ka->keys_len + 1) * sizeof(wsrep_key_t),
+ MYF(0));
+ else
+ tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0));
- if (!table || !find_temporary_table(thd, &tmp_table))
- {
- if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
- {
- WSREP_ERROR("Can't allocate memory for key_array");
- goto err;
- }
- ka->keys_len= 1;
- if (!(ka->keys[0].key_parts= (wsrep_buf_t*)
- my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
- {
- WSREP_ERROR("Can't allocate memory for key_parts");
- goto err;
- }
- ka->keys[0].key_parts_num= 2;
- if (!wsrep_prepare_key_for_isolation(
- db, table,
- (wsrep_buf_t*)ka->keys[0].key_parts,
- &ka->keys[0].key_parts_num))
- {
- WSREP_ERROR("Preparing keys for isolation failed");
- goto err;
- }
- }
+ if (!tmp)
+ {
+ WSREP_ERROR("Can't allocate memory for key_array");
+ goto err;
}
-
- for (const TABLE_LIST* table= table_list; table; table= table->next_global)
+ ka->keys= tmp;
+ if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{
- if (!find_temporary_table(thd, table))
- {
- wsrep_key_t* tmp;
- tmp= (wsrep_key_t*)my_realloc(
- ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t),
- MYF(MY_ALLOW_ZERO_PTR));
-
- if (!tmp)
- {
- WSREP_ERROR("Can't allocate memory for key_array");
- goto err;
- }
- ka->keys= tmp;
- if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
- my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
- {
- WSREP_ERROR("Can't allocate memory for key_parts");
- goto err;
- }
- ka->keys[ka->keys_len].key_parts_num= 2;
- ++ka->keys_len;
- if (!wsrep_prepare_key_for_isolation(
- table->db, table->table_name,
- (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
- &ka->keys[ka->keys_len - 1].key_parts_num))
- {
- WSREP_ERROR("Preparing keys for isolation failed");
- goto err;
- }
- }
+ WSREP_ERROR("Can't allocate memory for key_parts");
+ goto err;
}
- return true;
+ ka->keys[ka->keys_len].key_parts_num= 2;
+ ++ka->keys_len;
+ if (!wsrep_prepare_key_for_isolation(table->db, table->table_name,
+ (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
+ &ka->keys[ka->keys_len - 1].key_parts_num))
+ {
+ WSREP_ERROR("Preparing keys for isolation failed (2)");
+ goto err;
+ }
+ }
+ return 0;
err:
wsrep_keys_free(ka);
- return false;
+ return 1;
}
@@ -1379,6 +1363,142 @@ static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len);
static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
/*
+ Rewrite DROP TABLE for TOI. Temporary tables are eliminated from
+ the query as they are visible only to client connection.
+
+ TODO: See comments for sql_base.cc:drop_temporary_table() and refine
+ the function to deal with transactional locked tables.
+ */
+static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len)
+{
+
+ LEX* lex= thd->lex;
+ SELECT_LEX* select_lex= &lex->select_lex;
+ TABLE_LIST* first_table= select_lex->table_list.first;
+ String buff;
+
+ bool found_temp_table= false;
+ for (TABLE_LIST* table= first_table; table; table= table->next_global)
+ {
+ if (find_temporary_table(thd, table->db, table->table_name))
+ {
+ found_temp_table= true;
+ break;
+ }
+ }
+
+ if (found_temp_table)
+ {
+ buff.append("DROP TABLE ");
+ if (lex->check_exists)
+ buff.append("IF EXISTS ");
+
+ for (TABLE_LIST* table= first_table; table; table= table->next_global)
+ {
+ if (!find_temporary_table(thd, table->db, table->table_name))
+ {
+ append_identifier(thd, &buff, table->db, strlen(table->db));
+ buff.append(".");
+ append_identifier(thd, &buff, table->table_name,
+ strlen(table->table_name));
+ buff.append(",");
+ }
+ }
+
+ /* Chop the last comma */
+ buff.chop();
+ buff.append(" /* generated by wsrep */");
+
+ WSREP_DEBUG("Rewrote '%s' as '%s'", thd->query(), buff.ptr());
+
+ return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
+ }
+ else
+ {
+ return wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
+ buf, buf_len);
+ }
+}
+
+/*
+ Decide if statement should run in TOI.
+
+ Look if table or table_list contain temporary tables. If the
+ statement affects only temporary tables, statement should not run
+ in TOI. If the table list contains mix of regular and temporary tables
+ (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but
+ should be rewritten at later time for replication to contain only
+ non-temporary tables.
+ */
+static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
+ const TABLE_LIST *table_list)
+{
+ DBUG_ASSERT(!table || db);
+ DBUG_ASSERT(table_list || db);
+
+ LEX* lex= thd->lex;
+ SELECT_LEX* select_lex= &lex->select_lex;
+ TABLE_LIST* first_table= select_lex->table_list.first;
+
+ switch (lex->sql_command)
+ {
+ case SQLCOM_CREATE_TABLE:
+ DBUG_ASSERT(!table_list);
+ if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)
+ {
+ return false;
+ }
+ return true;
+
+ case SQLCOM_CREATE_VIEW:
+
+ DBUG_ASSERT(!table_list);
+ DBUG_ASSERT(first_table); /* First table is view name */
+ /*
+ If any of the remaining tables refer to temporary table error
+ is returned to client, so TOI can be skipped
+ */
+ for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global)
+ {
+ if (find_temporary_table(thd, it))
+ {
+ return false;
+ }
+ }
+ return true;
+
+ case SQLCOM_CREATE_TRIGGER:
+
+ DBUG_ASSERT(!table_list);
+ DBUG_ASSERT(first_table);
+
+ if (find_temporary_table(thd, first_table))
+ {
+ return false;
+ }
+ return true;
+
+ default:
+ if (table && !find_temporary_table(thd, db, table))
+ {
+ return true;
+ }
+
+ if (table_list)
+ {
+ for (TABLE_LIST* table= first_table; table; table= table->next_global)
+ {
+ if (!find_temporary_table(thd, table->db, table->table_name))
+ {
+ return true;
+ }
+ }
+ }
+ return !(table || table_list);
+ }
+}
+
+/*
returns:
0: statement was replicated as TOI
1: TOI replication was skipped
@@ -1393,6 +1513,12 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
int buf_err;
int rc= 0;
+ if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
+ {
+ WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
+ return 1;
+ }
+
WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() );
switch (thd->lex->sql_command)
@@ -1420,16 +1546,16 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
}
/* fallthrough */
default:
- buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), &buf,
- &buf_len);
+ buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
+ &buf, &buf_len);
break;
}
wsrep_key_arr_t key_arr= {0, 0};
struct wsrep_buf buff = { buf, buf_len };
- if (!buf_err &&
- wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&&
- key_arr.keys_len > 0 &&
+ if (!buf_err &&
+ !wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr) &&
+ key_arr.keys_len > 0 &&
WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
key_arr.keys, key_arr.keys_len,
&buff, 1,
@@ -1626,9 +1752,12 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
{
switch (thd->variables.wsrep_OSU_method) {
- case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_,
- table_list); break;
- case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break;
+ case WSREP_OSU_TOI:
+ ret = wsrep_TOI_begin(thd, db_, table_, table_list);
+ break;
+ case WSREP_OSU_RSU:
+ ret = wsrep_RSU_begin(thd, db_, table_);
+ break;
default:
WSREP_ERROR("Unsupported OSU method: %lu",
thd->variables.wsrep_OSU_method);
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index f02a3cd72f3..fd68fab991c 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -100,11 +100,12 @@ enum enum_wsrep_OSU_method {
enum enum_wsrep_sync_wait {
WSREP_SYNC_WAIT_NONE = 0x0,
- // show, select, begin
+ // select, begin
WSREP_SYNC_WAIT_BEFORE_READ = 0x1,
WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE = 0x2,
WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE = 0x4,
- WSREP_SYNC_WAIT_MAX = 0x7
+ WSREP_SYNC_WAIT_BEFORE_SHOW = 0x8,
+ WSREP_SYNC_WAIT_MAX = 0xF
};
// MySQL status variables
@@ -224,6 +225,8 @@ extern wsrep_seqno_t wsrep_locked_seqno;
#define WSREP_PROVIDER_EXISTS \
(wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN))
+#define WSREP_QUERY(thd) (thd->query())
+
extern void wsrep_ready_wait();
class Ha_trx_info;
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc
index 7f9a88c3aa6..a187c75baa3 100644
--- a/sql/wsrep_sst.cc
+++ b/sql/wsrep_sst.cc
@@ -697,7 +697,7 @@ ssize_t wsrep_sst_prepare (void** msg)
// Attempt 1: wsrep_sst_receive_address
if (wsrep_sst_receive_address &&
- strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
+ strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
{
addr_in= wsrep_sst_receive_address;
}
@@ -837,16 +837,13 @@ static int sst_donate_mysqldump (const char* addr,
{
char host[256];
wsp::Address address(addr);
-
if (!address.is_valid())
{
WSREP_ERROR("Could not parse SST address : %s", addr);
return 0;
}
-
memcpy(host, address.get_address(), address.get_address_len());
int port= address.get_port();
-
int const cmd_len= 4096;
wsp::string cmd_str(cmd_len);
@@ -863,7 +860,7 @@ static int sst_donate_mysqldump (const char* addr,
int ret= snprintf (cmd_str(), cmd_len,
"wsrep_sst_mysqldump "
- WSREP_SST_OPT_HOST" '%s' "
+ WSREP_SST_OPT_ADDR" '%s' "
WSREP_SST_OPT_PORT" '%d' "
WSREP_SST_OPT_LPORT" '%u' "
WSREP_SST_OPT_SOCKET" '%s' "
@@ -871,7 +868,7 @@ static int sst_donate_mysqldump (const char* addr,
WSREP_SST_OPT_GTID" '%s:%lld' "
WSREP_SST_OPT_GTID_DOMAIN_ID" '%d'"
"%s",
- host, port, mysqld_port, mysqld_unix_port,
+ addr, port, mysqld_port, mysqld_unix_port,
wsrep_defaults_file, uuid_str,
(long long)seqno, wsrep_gtid_domain_id,
bypass ? " " WSREP_SST_OPT_BYPASS : "");
diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc
index a0111354d02..580c8bbd55c 100644
--- a/sql/wsrep_utils.cc
+++ b/sql/wsrep_utils.cc
@@ -574,3 +574,17 @@ done:
return ret;
}
+/* returns the length of the host part of the address string */
+size_t wsrep_host_len(const char* const addr, size_t const addr_len)
+{
+ // check for IPv6 notation first
+ const char* const bracket= ('[' == addr[0] ? strchr(addr, ']') : NULL);
+
+ if (bracket) { // IPv6
+ return (bracket - addr + 1);
+ }
+ else { // host part ends at ':' or end of string
+ const char* const colon= strchr(addr, ':');
+ return (colon ? colon - addr : addr_len);
+ }
+}
diff --git a/sql/wsrep_utils.h b/sql/wsrep_utils.h
index f20e02d03a2..88a4c1e1a70 100644
--- a/sql/wsrep_utils.h
+++ b/sql/wsrep_utils.h
@@ -22,6 +22,9 @@
unsigned int wsrep_check_ip (const char* const addr, bool *is_ipv6);
size_t wsrep_guess_ip (char* buf, size_t buf_len);
+/* returns the length of the host part of the address string */
+size_t wsrep_host_len(const char* addr, size_t addr_len);
+
namespace wsp {
class Address {
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index 158d0ca847d..1d117766db4 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -35,6 +35,8 @@ const char* wsrep_node_address = 0;
const char* wsrep_node_incoming_address = 0;
const char* wsrep_start_position = 0;
+static long wsrep_prev_slave_threads = wsrep_slave_threads;
+
int wsrep_init_vars()
{
wsrep_provider = my_strdup(WSREP_NONE, MYF(MY_WME));
@@ -59,12 +61,23 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type)
return false;
}
-bool wsrep_causal_reads_update (SV *sv)
+bool wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type)
{
- if (sv->wsrep_causal_reads) {
- sv->wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ;
+ // global setting should not affect session setting.
+ // if (var_type == OPT_GLOBAL) {
+ // thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads;
+ // }
+ if (thd->variables.wsrep_causal_reads) {
+ thd->variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ;
+ } else {
+ thd->variables.wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ;
+ }
+
+ // update global settings too.
+ if (global_system_variables.wsrep_causal_reads) {
+ global_system_variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ;
} else {
- sv->wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ;
+ global_system_variables.wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ;
}
return false;
@@ -72,12 +85,17 @@ bool wsrep_causal_reads_update (SV *sv)
bool wsrep_sync_wait_update (sys_var* self, THD* thd, enum_var_type var_type)
{
- if (var_type == OPT_GLOBAL)
- global_system_variables.wsrep_causal_reads =
- MY_TEST(global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ);
- else
- thd->variables.wsrep_causal_reads =
- MY_TEST(thd->variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ);
+ // global setting should not affect session setting.
+ // if (var_type == OPT_GLOBAL) {
+ // thd->variables.wsrep_sync_wait = global_system_variables.wsrep_sync_wait;
+ // }
+ thd->variables.wsrep_causal_reads = thd->variables.wsrep_sync_wait &
+ WSREP_SYNC_WAIT_BEFORE_READ;
+
+ // update global settings too
+ global_system_variables.wsrep_causal_reads = global_system_variables.wsrep_sync_wait &
+ WSREP_SYNC_WAIT_BEFORE_READ;
+
return false;
}
@@ -502,18 +520,15 @@ void wsrep_node_address_init (const char* value)
wsrep_node_address = (value) ? my_strdup(value, MYF(0)) : NULL;
}
-bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var)
+static void wsrep_slave_count_change_update ()
{
- mysql_mutex_lock(&LOCK_wsrep_slave_threads);
- wsrep_slave_count_change += (var->save_result.ulonglong_value -
- wsrep_slave_threads);
- mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
-
- return 0;
+ wsrep_slave_count_change += (wsrep_slave_threads - wsrep_prev_slave_threads);
+ wsrep_prev_slave_threads = wsrep_slave_threads;
}
bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type)
{
+ wsrep_slave_count_change_update();
if (wsrep_slave_count_change > 0)
{
wsrep_create_appliers(wsrep_slave_count_change);
diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h
index ca77b5c0039..7530fd98870 100644
--- a/sql/wsrep_var.h
+++ b/sql/wsrep_var.h
@@ -41,9 +41,7 @@ int wsrep_init_vars();
#define DEFAULT_ARGS (THD* thd, enum_var_type var_type)
#define INIT_ARGS (const char* opt)
-struct system_variables;
-bool wsrep_causal_reads_update(struct system_variables *sv);
-
+extern bool wsrep_causal_reads_update UPDATE_ARGS;
extern bool wsrep_on_update UPDATE_ARGS;
extern bool wsrep_sync_wait_update UPDATE_ARGS;
extern bool wsrep_start_position_check CHECK_ARGS;