summaryrefslogtreecommitdiff
path: root/sql/sql_class.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r--sql/sql_class.cc1838
1 files changed, 1392 insertions, 446 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index a665cd6522e..1ec2d8b3c9e 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -28,13 +28,13 @@
#pragma implementation // gcc: Class implementation
#endif
-#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */
+#include <my_global.h> /* NO_EMBEDDED_ACCESS_CHECKS */
#include "sql_priv.h"
-#include "unireg.h" // REQUIRED: for other includes
#include "sql_class.h"
#include "sql_cache.h" // query_cache_abort
#include "sql_base.h" // close_thread_tables
#include "sql_time.h" // date_time_format_copy
+#include "tztime.h" // MYSQL_TIME <-> my_time_t
#include "sql_acl.h" // NO_ACCESS,
// acl_getroot_no_password
#include "sql_base.h" // close_temporary_tables
@@ -62,6 +62,7 @@
#include "debug_sync.h"
#include "sql_parse.h" // is_update_query
#include "sql_callback.h"
+#include "lock.h"
#include "sql_connect.h"
#include "repl_failsafe.h"
@@ -74,23 +75,6 @@ char empty_c_string[1]= {0}; /* used for not defined db */
const char * const THD::DEFAULT_WHERE= "field list";
-
-/*****************************************************************************
-** Instansiate templates
-*****************************************************************************/
-
-#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
-/* Used templates */
-template class List<Key>;
-template class List_iterator<Key>;
-template class List<Key_part_spec>;
-template class List_iterator<Key_part_spec>;
-template class List<Alter_drop>;
-template class List_iterator<Alter_drop>;
-template class List<Alter_column>;
-template class List_iterator<Alter_column>;
-#endif
-
/****************************************************************************
** User variables
****************************************************************************/
@@ -130,7 +114,8 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root)
columns(rhs.columns, mem_root),
name(rhs.name),
option_list(rhs.option_list),
- generated(rhs.generated)
+ generated(rhs.generated),
+ create_if_not_exists(rhs.create_if_not_exists)
{
list_copy_and_replace_each_value(columns, mem_root);
}
@@ -144,6 +129,7 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root)
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
:Key(rhs,mem_root),
+ ref_db(rhs.ref_db),
ref_table(rhs.ref_table),
ref_columns(rhs.ref_columns,mem_root),
delete_opt(rhs.delete_opt),
@@ -245,7 +231,7 @@ bool Foreign_key::validate(List<Create_field> &table_fields)
sql_field->field_name)) {}
if (!sql_field)
{
- my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name);
+ my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str);
DBUG_RETURN(TRUE);
}
if (type == Key::FOREIGN_KEY && sql_field->vcol_info)
@@ -368,29 +354,6 @@ void thd_set_thread_stack(THD *thd, char *stack_start)
}
/**
- Lock connection data for the set of connections this connection
- belongs to
-
- @param thd THD object
-*/
-void thd_lock_thread_count(THD *)
-{
- mysql_mutex_lock(&LOCK_thread_count);
-}
-
-/**
- Lock connection data for the set of connections this connection
- belongs to
-
- @param thd THD object
-*/
-void thd_unlock_thread_count(THD *)
-{
- mysql_cond_broadcast(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
-}
-
-/**
Close the socket used by this connection
@param thd THD object
@@ -496,7 +459,7 @@ void thd_set_mysys_var(THD *thd, st_my_thread_var *mysys_var)
*/
my_socket thd_get_fd(THD *thd)
{
- return thd->net.vio->sd;
+ return mysql_socket_getfd(thd->net.vio->mysql_socket);
}
#endif
@@ -552,54 +515,103 @@ extern "C" int mysql_tmpfile(const char *prefix)
extern "C"
int thd_in_lock_tables(const THD *thd)
{
- return test(thd->in_lock_tables);
+ return MY_TEST(thd->in_lock_tables);
}
extern "C"
int thd_tablespace_op(const THD *thd)
{
- return test(thd->tablespace_op);
+ return MY_TEST(thd->tablespace_op);
}
-
extern "C"
-const char *set_thd_proc_info(THD *thd, const char *info,
+const char *set_thd_proc_info(THD *thd_arg, const char *info,
const char *calling_function,
const char *calling_file,
const unsigned int calling_line)
{
- if (!thd)
+ PSI_stage_info old_stage;
+ PSI_stage_info new_stage;
+
+ old_stage.m_key= 0;
+ old_stage.m_name= info;
+
+ set_thd_stage_info(thd_arg, & old_stage, & new_stage,
+ calling_function, calling_file, calling_line);
+
+ return new_stage.m_name;
+}
+
+extern "C"
+void set_thd_stage_info(void *thd_arg,
+ const PSI_stage_info *new_stage,
+ PSI_stage_info *old_stage,
+ const char *calling_func,
+ const char *calling_file,
+ const unsigned int calling_line)
+{
+ THD *thd= (THD*) thd_arg;
+ if (thd == NULL)
thd= current_thd;
- const char *old_info= thd->proc_info;
- DBUG_PRINT("proc_info", ("%s:%d %s", calling_file, calling_line, info));
+ thd->enter_stage(new_stage, old_stage, calling_func, calling_file,
+ calling_line);
+}
+
+void THD::enter_stage(const PSI_stage_info *new_stage,
+ PSI_stage_info *old_stage,
+ const char *calling_func,
+ const char *calling_file,
+ const unsigned int calling_line)
+{
+ DBUG_PRINT("THD::enter_stage", ("%s:%d", calling_file, calling_line));
+
+ if (old_stage != NULL)
+ {
+ old_stage->m_key= m_current_stage_key;
+ old_stage->m_name= proc_info;
+ }
+
+ if (new_stage != NULL)
+ {
+ const char *msg= new_stage->m_name;
#if defined(ENABLED_PROFILING)
- thd->profiling.status_change(info,
- calling_function, calling_file, calling_line);
+ profiling.status_change(msg, calling_func, calling_file, calling_line);
#endif
- thd->proc_info= info;
- return old_info;
+
+ m_current_stage_key= new_stage->m_key;
+ proc_info= msg;
+
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_THREAD_CALL(set_thread_state)(msg);
+ MYSQL_SET_STAGE(m_current_stage_key, calling_file, calling_line);
+#endif
+ }
+ return;
}
-extern "C"
-const char* thd_enter_cond(MYSQL_THD thd, mysql_cond_t *cond,
- mysql_mutex_t *mutex, const char *msg)
+void thd_enter_cond(MYSQL_THD thd, mysql_cond_t *cond, mysql_mutex_t *mutex,
+ const PSI_stage_info *stage, PSI_stage_info *old_stage,
+ const char *src_function, const char *src_file,
+ int src_line)
{
if (!thd)
thd= current_thd;
- return thd->enter_cond(cond, mutex, msg);
+ return thd->enter_cond(cond, mutex, stage, old_stage, src_function, src_file,
+ src_line);
}
-extern "C"
-void thd_exit_cond(MYSQL_THD thd, const char *old_msg)
+void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage,
+ const char *src_function, const char *src_file,
+ int src_line)
{
if (!thd)
thd= current_thd;
- thd->exit_cond(old_msg);
+ thd->exit_cond(stage, src_function, src_file, src_line);
return;
}
@@ -645,6 +657,17 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton,
}
+/**
+ Allow storage engine to wakeup commits waiting in THD::wait_for_prior_commit.
+ @see thd_wakeup_subsequent_commits() definition in plugin.h
+*/
+extern "C"
+void thd_wakeup_subsequent_commits(THD *thd, int wakeup_error)
+{
+ thd->wakeup_subsequent_commits(wakeup_error);
+}
+
+
extern "C"
long long thd_test_options(const THD *thd, long long test_options)
{
@@ -664,109 +687,51 @@ int thd_tx_isolation(const THD *thd)
}
extern "C"
-void thd_inc_row_count(THD *thd)
+int thd_tx_is_read_only(const THD *thd)
{
- thd->warning_info->inc_current_row_for_warning();
+ return (int) thd->tx_read_only;
}
-/**
- Dumps a text description of a thread, its security context
- (user, host) and the current query.
-
- @param thd thread context
- @param buffer pointer to preferred result buffer
- @param length length of buffer
- @param max_query_len how many chars of query to copy (0 for all)
-
- @req LOCK_thread_count
-
- @note LOCK_thread_count mutex is not necessary when the function is invoked on
- the currently running thread (current_thd) or if the caller in some other
- way guarantees that access to thd->query is serialized.
-
- @return Pointer to string
-*/
-
extern "C"
-char *thd_security_context(THD *thd, char *buffer, unsigned int length,
- unsigned int max_query_len)
-{
- String str(buffer, length, &my_charset_latin1);
- const Security_context *sctx= &thd->main_security_ctx;
- char header[256];
- int len;
- /*
- The pointers thd->query and thd->proc_info might change since they are
- being modified concurrently. This is acceptable for proc_info since its
- values doesn't have to very accurate and the memory it points to is static,
- but we need to attempt a snapshot on the pointer values to avoid using NULL
- values. The pointer to thd->query however, doesn't point to static memory
- and has to be protected by thd->LOCK_thd_data or risk pointing to
- uninitialized memory.
- */
- const char *proc_info= thd->proc_info;
-
- len= my_snprintf(header, sizeof(header),
- "MySQL thread id %lu, OS thread handle 0x%lx, query id %lu",
- thd->thread_id, (ulong) thd->real_id, (ulong) thd->query_id);
- str.length(0);
- str.append(header, len);
-
- if (sctx->host)
- {
- str.append(' ');
- str.append(sctx->host);
- }
+{ /* Functions for thd_error_context_service */
- if (sctx->ip)
+ const char *thd_get_error_message(const THD *thd)
{
- str.append(' ');
- str.append(sctx->ip);
+ return thd->get_stmt_da()->message();
}
- if (sctx->user)
+ uint thd_get_error_number(const THD *thd)
{
- str.append(' ');
- str.append(sctx->user);
+ return thd->get_stmt_da()->sql_errno();
}
- if (proc_info)
+ ulong thd_get_error_row(const THD *thd)
{
- str.append(' ');
- str.append(proc_info);
+ return thd->get_stmt_da()->current_row_for_warning();
}
- /* Don't wait if LOCK_thd_data is used as this could cause a deadlock */
- if (!mysql_mutex_trylock(&thd->LOCK_thd_data))
+ void thd_inc_error_row(THD *thd)
{
- if (thd->query())
- {
- if (max_query_len < 1)
- len= thd->query_length();
- else
- len= min(thd->query_length(), max_query_len);
- str.append('\n');
- str.append(thd->query(), len);
- }
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ thd->get_stmt_da()->inc_current_row_for_warning();
}
+}
- if (str.c_ptr_safe() == buffer)
- return buffer;
- /*
- We have to copy the new string to the destination buffer because the string
- was reallocated to a larger buffer to be able to fit.
- */
- DBUG_ASSERT(buffer != NULL);
- length= min(str.length(), length-1);
- memcpy(buffer, str.c_ptr_quick(), length);
- /* Make sure that the new string is null terminated */
- buffer[length]= '\0';
- return buffer;
+#if MARIA_PLUGIN_INTERFACE_VERSION < 0x0200
+/**
+ TODO: This function is for API compatibility, remove it eventually.
+ All engines should switch to use thd_get_error_context_description()
+ plugin service function.
+*/
+extern "C"
+char *thd_security_context(THD *thd,
+ char *buffer, unsigned int length,
+ unsigned int max_query_len)
+{
+ return thd_get_error_context_description(thd, buffer, length, max_query_len);
}
-
+#endif
/**
Implementation of Drop_table_error_handler::handle_condition().
@@ -785,9 +750,9 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length,
bool Drop_table_error_handler::handle_condition(THD *thd,
uint sql_errno,
const char* sqlstate,
- MYSQL_ERROR::enum_warning_level level,
+ Sql_condition::enum_warning_level level,
const char* msg,
- MYSQL_ERROR ** cond_hdl)
+ Sql_condition ** cond_hdl)
{
*cond_hdl= NULL;
return ((sql_errno == EE_DELETE && my_errno == ENOENT) ||
@@ -798,7 +763,7 @@ bool Drop_table_error_handler::handle_condition(THD *thd,
THD::THD()
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
- rli_fake(0), rli_slave(NULL),
+ rli_fake(0), rgi_fake(0), rgi_slave(NULL),
in_sub_stmt(0), log_all_errors(0),
binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
@@ -808,10 +773,12 @@ THD::THD()
first_successful_insert_id_in_prev_stmt_for_binlog(0),
first_successful_insert_id_in_cur_stmt(0),
stmt_depends_on_first_successful_insert_id_in_prev_stmt(FALSE),
- examined_row_count(0),
+ m_examined_row_count(0),
accessed_rows_and_keys(0),
- warning_info(&main_warning_info),
- stmt_da(&main_da),
+ m_digest(NULL),
+ m_statement_psi(NULL),
+ m_idle_psi(NULL),
+ thread_id(0),
global_disable_checkpoint(0),
failed_com_change_user(0),
is_fatal_error(0),
@@ -822,22 +789,36 @@ THD::THD()
in_lock_tables(0),
bootstrap(0),
derived_tables_processing(FALSE),
+ waiting_on_group_commit(FALSE), has_waiter(FALSE),
spcont(NULL),
m_parser_state(NULL),
#if defined(ENABLED_DEBUG_SYNC)
debug_sync_control(0),
#endif /* defined(ENABLED_DEBUG_SYNC) */
- main_warning_info(0, false)
+ wait_for_commit_ptr(0),
+ main_da(0, false, false),
+ m_stmt_da(&main_da)
{
ulong tmp;
mdl_context.init(this);
/*
+ We set THR_THD to temporally point to this THD to register all the
+ variables that allocates memory for this THD
+ */
+ THD *old_THR_THD= current_thd;
+ set_current_thd(this);
+ status_var.memory_used= 0;
+ main_da.init();
+
+ /*
Pass nominal parameters to init_alloc_root only to ensure that
the destructor works OK in case of an error. The main_mem_root
will be re-initialized in init_for_queries().
*/
- init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
+ init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0,
+ MYF(MY_THREAD_SPECIFIC));
+
stmt_arena= this;
thread_stack= 0;
scheduler= thread_scheduler; // Will be fixed later
@@ -856,9 +837,10 @@ THD::THD()
col_access=0;
is_slave_error= thread_specific_used= FALSE;
my_hash_clear(&handler_tables_hash);
+ my_hash_clear(&ull_hash);
tmp_table=0;
cuted_fields= 0L;
- sent_row_count= 0L;
+ m_sent_row_count= 0L;
limit_found_rows= 0;
m_row_count_func= -1;
statement_id_counter= 0UL;
@@ -872,9 +854,10 @@ THD::THD()
progress.max_counter= 0;
current_linfo = 0;
slave_thread = 0;
+ connection_name.str= 0;
+ connection_name.length= 0;
+
bzero(&variables, sizeof(variables));
- thread_id= 0;
- one_shot_set= 0;
file_id = 0;
query_id= 0;
query_name_consts= 0;
@@ -884,6 +867,7 @@ THD::THD()
mysys_var=0;
binlog_evt_union.do_union= FALSE;
enable_slow_log= 0;
+ durability_property= HA_REGULAR_DURABILITY;
#ifndef DBUG_OFF
dbug_sentry=THD_SENTRY_MAGIC;
@@ -892,8 +876,8 @@ THD::THD()
mysql_audit_init_thd(this);
#endif
net.vio=0;
+ net.buff= 0;
client_capabilities= 0; // minimalistic client
- ull=0;
system_thread= NON_SYSTEM_THREAD;
cleanup_done= abort_on_warning= 0;
peer_port= 0; // For SHOW PROCESSLIST
@@ -918,9 +902,9 @@ THD::THD()
/* Variables with default values */
proc_info="login";
where= THD::DEFAULT_WHERE;
- server_id = ::server_id;
+ variables.server_id = global_system_variables.server_id;
slave_net = 0;
- command=COM_CONNECT;
+ m_command=COM_CONNECT;
*scramble= '\0';
/* Call to init() below requires fully initialized Open_tables_state. */
@@ -933,7 +917,7 @@ THD::THD()
user_connect=(USER_CONN *)0;
my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
(my_hash_get_key) get_var_key,
- (my_hash_free_key) free_user_var, 0);
+ (my_hash_free_key) free_user_var, HASH_THREAD_SPECIFIC);
sp_proc_cache= NULL;
sp_func_cache= NULL;
@@ -941,7 +925,7 @@ THD::THD()
/* For user vars replication*/
if (opt_bin_log)
my_init_dynamic_array(&user_var_events,
- sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
+ sizeof(BINLOG_USER_VAR_EVENT *), 16, 16, MYF(0));
else
bzero((char*) &user_var_events, sizeof(user_var_events));
@@ -951,18 +935,34 @@ THD::THD()
protocol_binary.init(this);
tablespace_op=FALSE;
- tmp= sql_rnd_with_mutex();
+
+ /*
+ Initialize the random generator. We call my_rnd() without a lock as
+ it's not really critical if two threads modifies the structure at the
+ same time. We ensure that we have an unique number foreach thread
+ by adding the address of the stack.
+ */
+ tmp= (ulong) (my_rnd(&sql_rand) * 0xffffffff);
my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
substitute_null_with_insert_id = FALSE;
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
+ m_token_array= NULL;
+ if (max_digest_length > 0)
+ {
+ m_token_array= (unsigned char*) my_malloc(max_digest_length,
+ MYF(MY_WME|MY_THREAD_SPECIFIC));
+ }
+
m_internal_handler= NULL;
- m_binlog_invoker= FALSE;
+ m_binlog_invoker= INVOKER_NONE;
memset(&invoker_user, 0, sizeof(invoker_user));
memset(&invoker_host, 0, sizeof(invoker_host));
prepare_derived_at_open= FALSE;
create_tmp_table_for_derived= FALSE;
save_prep_leaf_list= FALSE;
+ /* Restore THR_THD */
+ set_current_thd(old_THR_THD);
}
@@ -983,9 +983,9 @@ void THD::push_internal_handler(Internal_error_handler *handler)
bool THD::handle_condition(uint sql_errno,
const char* sqlstate,
- MYSQL_ERROR::enum_warning_level level,
+ Sql_condition::enum_warning_level level,
const char* msg,
- MYSQL_ERROR ** cond_hdl)
+ Sql_condition ** cond_hdl)
{
if (!m_internal_handler)
{
@@ -1022,7 +1022,7 @@ void THD::raise_error(uint sql_errno)
const char* msg= ER(sql_errno);
(void) raise_condition(sql_errno,
NULL,
- MYSQL_ERROR::WARN_LEVEL_ERROR,
+ Sql_condition::WARN_LEVEL_ERROR,
msg);
}
@@ -1038,7 +1038,7 @@ void THD::raise_error_printf(uint sql_errno, ...)
va_end(args);
(void) raise_condition(sql_errno,
NULL,
- MYSQL_ERROR::WARN_LEVEL_ERROR,
+ Sql_condition::WARN_LEVEL_ERROR,
ebuff);
DBUG_VOID_RETURN;
}
@@ -1048,7 +1048,7 @@ void THD::raise_warning(uint sql_errno)
const char* msg= ER(sql_errno);
(void) raise_condition(sql_errno,
NULL,
- MYSQL_ERROR::WARN_LEVEL_WARN,
+ Sql_condition::WARN_LEVEL_WARN,
msg);
}
@@ -1064,7 +1064,7 @@ void THD::raise_warning_printf(uint sql_errno, ...)
va_end(args);
(void) raise_condition(sql_errno,
NULL,
- MYSQL_ERROR::WARN_LEVEL_WARN,
+ Sql_condition::WARN_LEVEL_WARN,
ebuff);
DBUG_VOID_RETURN;
}
@@ -1078,7 +1078,7 @@ void THD::raise_note(uint sql_errno)
const char* msg= ER(sql_errno);
(void) raise_condition(sql_errno,
NULL,
- MYSQL_ERROR::WARN_LEVEL_NOTE,
+ Sql_condition::WARN_LEVEL_NOTE,
msg);
DBUG_VOID_RETURN;
}
@@ -1097,24 +1097,25 @@ void THD::raise_note_printf(uint sql_errno, ...)
va_end(args);
(void) raise_condition(sql_errno,
NULL,
- MYSQL_ERROR::WARN_LEVEL_NOTE,
+ Sql_condition::WARN_LEVEL_NOTE,
ebuff);
DBUG_VOID_RETURN;
}
-MYSQL_ERROR* THD::raise_condition(uint sql_errno,
+Sql_condition* THD::raise_condition(uint sql_errno,
const char* sqlstate,
- MYSQL_ERROR::enum_warning_level level,
+ Sql_condition::enum_warning_level level,
const char* msg)
{
- MYSQL_ERROR *cond= NULL;
+ Diagnostics_area *da= get_stmt_da();
+ Sql_condition *cond= NULL;
DBUG_ENTER("THD::raise_condition");
if (!(variables.option_bits & OPTION_SQL_NOTES) &&
- (level == MYSQL_ERROR::WARN_LEVEL_NOTE))
+ (level == Sql_condition::WARN_LEVEL_NOTE))
DBUG_RETURN(NULL);
- warning_info->opt_clear_warning_info(query_id);
+ da->opt_clear_warning_info(query_id);
/*
TODO: replace by DBUG_ASSERT(sql_errno != 0) once all bugs similar to
@@ -1128,24 +1129,24 @@ MYSQL_ERROR* THD::raise_condition(uint sql_errno,
if (sqlstate == NULL)
sqlstate= mysql_errno_to_sqlstate(sql_errno);
- if ((level == MYSQL_ERROR::WARN_LEVEL_WARN) &&
+ if ((level == Sql_condition::WARN_LEVEL_WARN) &&
really_abort_on_warning())
{
/*
FIXME:
push_warning and strict SQL_MODE case.
*/
- level= MYSQL_ERROR::WARN_LEVEL_ERROR;
+ level= Sql_condition::WARN_LEVEL_ERROR;
killed= KILL_BAD_DATA;
}
switch (level)
{
- case MYSQL_ERROR::WARN_LEVEL_NOTE:
- case MYSQL_ERROR::WARN_LEVEL_WARN:
+ case Sql_condition::WARN_LEVEL_NOTE:
+ case Sql_condition::WARN_LEVEL_WARN:
got_warning= 1;
break;
- case MYSQL_ERROR::WARN_LEVEL_ERROR:
+ case Sql_condition::WARN_LEVEL_ERROR:
break;
default:
DBUG_ASSERT(FALSE);
@@ -1154,16 +1155,16 @@ MYSQL_ERROR* THD::raise_condition(uint sql_errno,
if (handle_condition(sql_errno, sqlstate, level, msg, &cond))
DBUG_RETURN(cond);
- if (level == MYSQL_ERROR::WARN_LEVEL_ERROR)
+ if (level == Sql_condition::WARN_LEVEL_ERROR)
{
mysql_audit_general(this, MYSQL_AUDIT_GENERAL_ERROR, sql_errno, msg);
is_slave_error= 1; // needed to catch query errors during replication
- if (! stmt_da->is_error())
+ if (!da->is_error())
{
set_row_count_func(-1);
- stmt_da->set_error_status(this, sql_errno, msg, sqlstate);
+ da->set_error_status(sql_errno, msg, sqlstate, cond);
}
}
@@ -1177,7 +1178,7 @@ MYSQL_ERROR* THD::raise_condition(uint sql_errno,
if (!(is_fatal_error && (sql_errno == EE_OUTOFMEMORY ||
sql_errno == ER_OUTOFMEMORY)))
{
- cond= warning_info->push_warning(this, sql_errno, sqlstate, level, msg);
+ cond= da->push_warning(this, sql_errno, sqlstate, level, msg);
}
DBUG_RETURN(cond);
}
@@ -1211,8 +1212,8 @@ LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str,
const char *str, unsigned int size,
int allocate_lex_string)
{
- return thd->make_lex_string(lex_str, str, size,
- (bool) allocate_lex_string);
+ return allocate_lex_string ? thd->make_lex_string(str, size)
+ : thd->make_lex_string(lex_str, str, size);
}
extern "C"
@@ -1227,6 +1228,26 @@ void thd_get_xid(const MYSQL_THD thd, MYSQL_XID *xid)
*xid = *(MYSQL_XID *) &thd->transaction.xid_state.xid;
}
+
+extern "C"
+my_time_t thd_TIME_to_gmt_sec(MYSQL_THD thd, const MYSQL_TIME *ltime,
+ unsigned int *errcode)
+{
+ Time_zone *tz= thd ? thd->variables.time_zone :
+ global_system_variables.time_zone;
+ return tz->TIME_to_gmt_sec(ltime, errcode);
+}
+
+
+extern "C"
+void thd_gmt_sec_to_TIME(MYSQL_THD thd, MYSQL_TIME *ltime, my_time_t t)
+{
+ Time_zone *tz= thd ? thd->variables.time_zone :
+ global_system_variables.time_zone;
+ tz->gmt_sec_to_TIME(ltime, t);
+}
+
+
#ifdef _WIN32
extern "C" THD *_current_thd_noinline(void)
{
@@ -1239,6 +1260,7 @@ extern "C" THD *_current_thd_noinline(void)
void THD::init(void)
{
+ DBUG_ENTER("thd::init");
mysql_mutex_lock(&LOCK_global_system_variables);
plugin_thdvar_init(this);
/*
@@ -1247,7 +1269,14 @@ void THD::init(void)
avoid temporary tables replication failure.
*/
variables.pseudo_thread_id= thread_id;
+
+ variables.default_master_connection.str= default_master_connection_buff;
+ ::strmake(variables.default_master_connection.str,
+ global_system_variables.default_master_connection.str,
+ variables.default_master_connection.length);
+
mysql_mutex_unlock(&LOCK_global_system_variables);
+
server_status= SERVER_STATUS_AUTOCOMMIT;
if (variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES)
server_status|= SERVER_STATUS_NO_BACKSLASH_ESCAPES;
@@ -1261,11 +1290,14 @@ void THD::init(void)
TL_WRITE_LOW_PRIORITY :
TL_WRITE);
tx_isolation= (enum_tx_isolation) variables.tx_isolation;
+ tx_read_only= variables.tx_read_only;
update_charset();
reset_current_stmt_binlog_format_row();
- bzero((char *) &status_var, sizeof(status_var));
+ reset_binlog_local_stmt_filter();
+ set_status_var_init();
bzero((char *) &org_status_var, sizeof(org_status_var));
start_bytes_received= 0;
+ last_commit_gtid.seq_no= 0;
status_in_global= 0;
if (variables.sql_log_bin)
@@ -1281,6 +1313,8 @@ void THD::init(void)
/* Initialize the Debug Sync Facility. See debug_sync.cc. */
debug_sync_init_thread(this);
#endif /* defined(ENABLED_DEBUG_SYNC) */
+ apc_target.init(&LOCK_thd_data);
+ DBUG_VOID_RETURN;
}
@@ -1419,8 +1453,6 @@ void THD::cleanup(void)
if (global_read_lock.is_acquired())
global_read_lock.unlock_global_read_lock(this);
- /* All metadata locks must have been released by now. */
- DBUG_ASSERT(!mdl_context.has_locks());
if (user_connect)
{
decrease_user_connections(user_connect);
@@ -1437,14 +1469,11 @@ void THD::cleanup(void)
sp_cache_clear(&sp_proc_cache);
sp_cache_clear(&sp_func_cache);
- if (ull)
- {
- mysql_mutex_lock(&LOCK_user_locks);
- item_user_lock_release(ull);
- mysql_mutex_unlock(&LOCK_user_locks);
- ull= NULL;
- }
+ mysql_ull_cleanup(this);
+ /* All metadata locks must have been released by now. */
+ DBUG_ASSERT(!mdl_context.has_locks());
+ apc_target.destroy();
cleanup_done=1;
DBUG_VOID_RETURN;
}
@@ -1452,8 +1481,16 @@ void THD::cleanup(void)
THD::~THD()
{
+ THD *orig_thd= current_thd;
THD_CHECK_SENTRY(this);
DBUG_ENTER("~THD()");
+
+ /*
+ In error cases, thd may not be current thd. We have to fix this so
+ that memory allocation counting is done correctly
+ */
+ set_current_thd(this);
+
/* Ensure that no one is using THD */
mysql_mutex_lock(&LOCK_thd_data);
mysql_mutex_unlock(&LOCK_thd_data);
@@ -1461,10 +1498,8 @@ THD::~THD()
/* Close connection */
#ifndef EMBEDDED_LIBRARY
if (net.vio)
- {
vio_delete(net.vio);
- net_end(&net);
- }
+ net_end(&net);
#endif
stmt_map.reset(); /* close all prepared statements */
if (!cleanup_done)
@@ -1475,7 +1510,6 @@ THD::~THD()
mysql_audit_release(this);
plugin_thdvar_cleanup(this);
- DBUG_PRINT("info", ("freeing security context"));
main_security_ctx.destroy();
my_free(db);
db= NULL;
@@ -1487,6 +1521,11 @@ THD::~THD()
dbug_sentry= THD_SENTRY_GONE;
#endif
#ifndef EMBEDDED_LIBRARY
+ if (rgi_fake)
+ {
+ delete rgi_fake;
+ rgi_fake= NULL;
+ }
if (rli_fake)
{
delete rli_fake;
@@ -1494,13 +1533,23 @@ THD::~THD()
}
mysql_audit_free_thd(this);
- if (rli_slave)
- rli_slave->cleanup_after_session();
+ if (rgi_slave)
+ rgi_slave->cleanup_after_session();
my_free(semisync_info);
unregister_slave(this, true, true);
#endif
free_root(&main_mem_root, MYF(0));
+ my_free(m_token_array);
+ main_da.free_memory();
+ if (status_var.memory_used != 0)
+ {
+ DBUG_PRINT("error", ("memory_used: %lld", status_var.memory_used));
+ SAFEMALLOC_REPORT_MEMORY(my_thread_dbug_id());
+ DBUG_ASSERT(status_var.memory_used == 0); // Ensure everything is freed
+ }
+
+ set_current_thd(orig_thd == this ? 0 : orig_thd);
DBUG_VOID_RETURN;
}
@@ -1598,7 +1647,8 @@ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
void THD::awake(killed_state state_to_set)
{
DBUG_ENTER("THD::awake");
- DBUG_PRINT("enter", ("this: %p current_thd: %p", this, current_thd));
+ DBUG_PRINT("enter", ("this: %p current_thd: %p state: %d",
+ this, current_thd, (int) state_to_set));
THD_CHECK_SENTRY(this);
mysql_mutex_assert_owner(&LOCK_thd_data);
@@ -1727,6 +1777,63 @@ void THD::disconnect()
}
+bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use,
+ bool needs_thr_lock_abort)
+{
+ THD *in_use= ctx_in_use->get_thd();
+ bool signalled= FALSE;
+ DBUG_ENTER("THD::notify_shared_lock");
+ DBUG_PRINT("enter",("needs_thr_lock_abort: %d", needs_thr_lock_abort));
+
+ if ((in_use->system_thread & SYSTEM_THREAD_DELAYED_INSERT) &&
+ !in_use->killed)
+ {
+ /* This code is similar to kill_delayed_threads() */
+ DBUG_PRINT("info", ("kill delayed thread"));
+ mysql_mutex_lock(&in_use->LOCK_thd_data);
+ if (in_use->killed < KILL_CONNECTION)
+ in_use->killed= KILL_CONNECTION;
+ if (in_use->mysys_var)
+ {
+ mysql_mutex_lock(&in_use->mysys_var->mutex);
+ if (in_use->mysys_var->current_cond)
+ mysql_cond_broadcast(in_use->mysys_var->current_cond);
+
+ /* Abort if about to wait in thr_upgrade_write_delay_lock */
+ in_use->mysys_var->abort= 1;
+ mysql_mutex_unlock(&in_use->mysys_var->mutex);
+ }
+ mysql_mutex_unlock(&in_use->LOCK_thd_data);
+ signalled= TRUE;
+ }
+
+ if (needs_thr_lock_abort)
+ {
+ mysql_mutex_lock(&in_use->LOCK_thd_data);
+ /* If not already dying */
+ if (in_use->killed != KILL_CONNECTION_HARD)
+ {
+ for (TABLE *thd_table= in_use->open_tables;
+ thd_table ;
+ thd_table= thd_table->next)
+ {
+ /*
+ Check for TABLE::needs_reopen() is needed since in some
+ places we call handler::close() for table instance (and set
+ TABLE::db_stat to 0) and do not remove such instances from
+ the THD::open_tables for some time, during which other
+ thread can see those instances (e.g. see partitioning code).
+ */
+ if (!thd_table->needs_reopen())
+ signalled|= mysql_lock_abort_for_thread(this, thd_table);
+ }
+ }
+ mysql_mutex_unlock(&in_use->LOCK_thd_data);
+ }
+ DBUG_RETURN(signalled);
+}
+
+
/*
Get error number for killed state
Note that the error message can't have any parameters.
@@ -1776,7 +1883,7 @@ bool THD::store_globals()
*/
DBUG_ASSERT(thread_stack);
- if (my_pthread_setspecific_ptr(THR_THD, this) ||
+ if (set_current_thd(this) ||
my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
return 1;
/*
@@ -1820,7 +1927,7 @@ void THD::reset_globals()
mysql_mutex_unlock(&LOCK_thd_data);
/* Undocking the thread specific data. */
- my_pthread_setspecific_ptr(THR_THD, NULL);
+ set_current_thd(0);
my_pthread_setspecific_ptr(THR_MALLOC, NULL);
}
@@ -1873,10 +1980,18 @@ void THD::cleanup_after_query()
which is intended to consume its event (there can be other
SET statements between them).
*/
- if ((rli_slave || rli_fake) && is_update_query(lex->sql_command))
+ if ((rgi_slave || rli_fake) && is_update_query(lex->sql_command))
auto_inc_intervals_forced.empty();
#endif
}
+ /*
+ Forget the binlog stmt filter for the next query.
+ There are some code paths that:
+ - do not call THD::decide_logging_format()
+ - do call THD::binlog_query(),
+ making this reset necessary.
+ */
+ reset_binlog_local_stmt_filter();
if (first_successful_insert_id_in_cur_stmt > 0)
{
/* set what LAST_INSERT_ID() will return */
@@ -1892,41 +2007,17 @@ void THD::cleanup_after_query()
where= THD::DEFAULT_WHERE;
/* reset table map for multi-table update */
table_map_for_update= 0;
- m_binlog_invoker= FALSE;
+ m_binlog_invoker= INVOKER_NONE;
#ifndef EMBEDDED_LIBRARY
- if (rli_slave)
- rli_slave->cleanup_after_query();
+ if (rgi_slave)
+ rgi_slave->cleanup_after_query();
#endif
DBUG_VOID_RETURN;
}
-/**
- Create a LEX_STRING in this connection.
-
- @param lex_str pointer to LEX_STRING object to be initialized
- @param str initializer to be copied into lex_str
- @param length length of str, in bytes
- @param allocate_lex_string if TRUE, allocate new LEX_STRING object,
- instead of using lex_str value
- @return NULL on failure, or pointer to the LEX_STRING object
-*/
-LEX_STRING *THD::make_lex_string(LEX_STRING *lex_str,
- const char* str, uint length,
- bool allocate_lex_string)
-{
- if (allocate_lex_string)
- if (!(lex_str= (LEX_STRING *)alloc_root(mem_root, sizeof(LEX_STRING))))
- return 0;
- if (!(lex_str->str= strmake_root(mem_root, str, length)))
- return 0;
- lex_str->length= length;
- return lex_str;
-}
-
-
/*
Convert a string to another character set
@@ -2114,6 +2205,21 @@ CHANGED_TABLE_LIST* THD::changed_table_dup(const char *key, long key_length)
int THD::send_explain_fields(select_result *result)
{
List<Item> field_list;
+ make_explain_field_list(field_list);
+ result->prepare(field_list, NULL);
+ return (result->send_result_set_metadata(field_list,
+ Protocol::SEND_NUM_ROWS |
+ Protocol::SEND_EOF));
+}
+
+
+/*
+ Populate the provided field_list with EXPLAIN output columns.
+ this->lex->describe has the EXPLAIN flags
+*/
+
+void THD::make_explain_field_list(List<Item> &field_list)
+{
Item *item;
CHARSET_INFO *cs= system_charset_info;
field_list.push_back(item= new Item_return_int("id",3, MYSQL_TYPE_LONGLONG));
@@ -2152,10 +2258,9 @@ int THD::send_explain_fields(select_result *result)
}
item->maybe_null= 1;
field_list.push_back(new Item_empty_string("Extra", 255, cs));
- return (result->send_result_set_metadata(field_list,
- Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
}
+
#ifdef SIGNAL_WITH_VIO_CLOSE
void THD::close_active_vio()
{
@@ -2279,12 +2384,6 @@ select_result::select_result()
thd=current_thd;
}
-void select_result::send_error(uint errcode,const char *err)
-{
- my_message(errcode, err, MYF(0));
-}
-
-
void select_result::cleanup()
{
/* do nothing */
@@ -2369,7 +2468,8 @@ int select_send::send_data(List<Item> &items)
Protocol *protocol= thd->protocol;
DBUG_ENTER("select_send::send_data");
- if (unit->offset_limit_cnt)
+ /* unit is not set when using 'delete ... returning' */
+ if (unit && unit->offset_limit_cnt)
{ // using limit offset,count
unit->offset_limit_cnt--;
DBUG_RETURN(FALSE);
@@ -2391,7 +2491,7 @@ int select_send::send_data(List<Item> &items)
DBUG_RETURN(TRUE);
}
- thd->sent_row_count++;
+ thd->inc_sent_row_count(1);
if (thd->vio_ok())
DBUG_RETURN(protocol->write());
@@ -2399,6 +2499,7 @@ int select_send::send_data(List<Item> &items)
DBUG_RETURN(0);
}
+
bool select_send::send_eof()
{
/*
@@ -2424,23 +2525,9 @@ bool select_send::send_eof()
Handling writing to file
************************************************************************/
-void select_to_file::send_error(uint errcode,const char *err)
-{
- my_message(errcode, err, MYF(0));
- if (file > 0)
- {
- (void) end_io_cache(&cache);
- mysql_file_close(file, MYF(0));
- /* Delete file on error */
- mysql_file_delete(key_select_to_file, path, MYF(0));
- file= -1;
- }
-}
-
-
bool select_to_file::send_eof()
{
- int error= test(end_io_cache(&cache));
+ int error= MY_TEST(end_io_cache(&cache));
if (mysql_file_close(file, MYF(MY_WME)) || thd->is_error())
error= true;
@@ -2483,7 +2570,7 @@ select_to_file::~select_to_file()
select_export::~select_export()
{
- thd->sent_row_count=row_count;
+ thd->set_sent_row_count(row_count);
}
@@ -2605,7 +2692,7 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
Non-ASCII separator arguments are not fully supported
*/
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED,
ER(WARN_NON_ASCII_SEPARATOR_NOT_IMPLEMENTED));
}
@@ -2621,8 +2708,8 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
escape_char= (int) (uchar) (*exchange->escaped)[0];
else
escape_char= -1;
- is_ambiguous_field_sep= test(strchr(ESCAPE_CHARS, field_sep_char));
- is_unsafe_field_sep= test(strchr(NUMERIC_CHARS, field_sep_char));
+ is_ambiguous_field_sep= MY_TEST(strchr(ESCAPE_CHARS, field_sep_char));
+ is_unsafe_field_sep= MY_TEST(strchr(NUMERIC_CHARS, field_sep_char));
line_sep_char= (exchange->line_term->length() ?
(int) (uchar) (*exchange->line_term)[0] : INT_MAX);
if (!field_term_length)
@@ -2636,7 +2723,7 @@ select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
(exchange->opt_enclosed && non_string_results &&
field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
{
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM));
is_ambiguous_field_term= TRUE;
}
@@ -2719,7 +2806,7 @@ int select_export::send_data(List<Item> &items)
convert_to_printable(printable_buff, sizeof(printable_buff),
error_pos, res->ptr() + res->length() - error_pos,
res->charset(), 6);
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_TRUNCATED_WRONG_VALUE_FOR_FIELD,
ER(ER_TRUNCATED_WRONG_VALUE_FOR_FIELD),
"string", printable_buff,
@@ -2730,7 +2817,7 @@ int select_export::send_data(List<Item> &items)
/*
result is longer than UINT_MAX32 and doesn't fit into String
*/
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
WARN_DATA_TRUNCATED, ER(WARN_DATA_TRUNCATED),
item->full_name(), static_cast<long>(row_count));
}
@@ -2765,7 +2852,7 @@ int select_export::send_data(List<Item> &items)
else
{
if (fixed_row_size)
- used_length=min(res->length(),item->max_length);
+ used_length=MY_MIN(res->length(),item->max_length);
else
used_length=res->length();
if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
@@ -3263,6 +3350,10 @@ void THD::end_statement()
}
+/*
+ Start using arena specified by @set. Current arena data will be saved to
+ *backup.
+*/
void THD::set_n_backup_active_arena(Query_arena *set, Query_arena *backup)
{
DBUG_ENTER("THD::set_n_backup_active_arena");
@@ -3277,6 +3368,12 @@ void THD::set_n_backup_active_arena(Query_arena *set, Query_arena *backup)
}
+/*
+ Stop using the temporary arena, and start again using the arena that is
+ specified in *backup.
+ The temporary arena is returned back into *set.
+*/
+
void THD::restore_active_arena(Query_arena *set, Query_arena *backup)
{
DBUG_ENTER("THD::restore_active_arena");
@@ -3496,7 +3593,7 @@ int select_dumpvar::send_data(List<Item> &items)
bool select_dumpvar::send_eof()
{
if (! row_count)
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA));
/*
Don't send EOF if we're in error condition (which implies we've already
@@ -3635,6 +3732,12 @@ void thd_increment_bytes_sent(ulong length)
}
}
+my_bool thd_net_is_killed()
+{
+ THD *thd= current_thd;
+ return thd && thd->killed ? 1 : 0;
+}
+
void thd_increment_bytes_received(ulong length)
{
@@ -3650,7 +3753,8 @@ void thd_increment_net_big_packet_count(ulong length)
void THD::set_status_var_init()
{
- bzero((char*) &status_var, sizeof(status_var));
+ bzero((char*) &status_var, offsetof(STATUS_VAR,
+ last_cleared_system_status_var));
}
@@ -3658,7 +3762,7 @@ void Security_context::init()
{
host= user= ip= external_user= 0;
host_or_ip= "connecting host";
- priv_user[0]= priv_host[0]= proxy_user[0]= '\0';
+ priv_user[0]= priv_host[0]= proxy_user[0]= priv_role[0]= '\0';
master_access= 0;
#ifndef NO_EMBEDDED_ACCESS_CHECKS
db_access= NO_ACCESS;
@@ -3668,6 +3772,7 @@ void Security_context::init()
void Security_context::destroy()
{
+ DBUG_PRINT("info", ("freeing security context"));
// If not pointer to constant
if (host != my_localhost)
{
@@ -3855,12 +3960,7 @@ void THD::restore_backup_open_tables_state(Open_tables_backup *backup)
#undef thd_killed
extern "C" int thd_killed(const MYSQL_THD thd)
{
- if (!thd)
- thd= current_thd;
-
- if (!(thd->killed & KILL_HARD_BIT))
- return 0;
- return thd->killed != 0;
+ return thd_kill_level(thd) > THD_ABORT_SOFTLY;
}
#else
#error now thd_killed() function can go away
@@ -3872,8 +3972,17 @@ extern "C" int thd_killed(const MYSQL_THD thd)
*/
extern "C" enum thd_kill_levels thd_kill_level(const MYSQL_THD thd)
{
+ THD* current= current_thd;
+
if (!thd)
- thd= current_thd;
+ thd= current;
+
+ if (thd == current)
+ {
+ Apc_target *apc_target= (Apc_target*)&thd->apc_target;
+ if (apc_target->have_apc_requests())
+ apc_target->process_apc_requests();
+ }
if (likely(thd->killed == NOT_KILLED))
return THD_IS_NOT_KILLED;
@@ -3881,6 +3990,7 @@ extern "C" enum thd_kill_levels thd_kill_level(const MYSQL_THD thd)
return thd->killed & KILL_HARD_BIT ? THD_ABORT_ASAP : THD_ABORT_SOFTLY;
}
+
/**
Send an out-of-band progress report to the client
@@ -3888,6 +3998,10 @@ extern "C" enum thd_kill_levels thd_kill_level(const MYSQL_THD thd)
however not more often than global.progress_report_time.
If global.progress_report_time is 0, then don't send progress reports, but
check every second if the value has changed
+
+ We clear any errors that we get from sending the progress packet to
+ the client as we don't want to set an error without the caller knowing
+ about it.
*/
static void thd_send_progress(THD *thd)
@@ -3896,7 +4010,7 @@ static void thd_send_progress(THD *thd)
ulonglong report_time= my_interval_timer();
if (report_time > thd->progress.next_report_time)
{
- uint seconds_to_next= max(thd->variables.progress_report_time,
+ uint seconds_to_next= MY_MAX(thd->variables.progress_report_time,
global_system_variables.progress_report_time);
if (seconds_to_next == 0) // Turned off
seconds_to_next= 1; // Check again after 1 second
@@ -3904,8 +4018,12 @@ static void thd_send_progress(THD *thd)
thd->progress.next_report_time= (report_time +
seconds_to_next * 1000000000ULL);
if (global_system_variables.progress_report_time &&
- thd->variables.progress_report_time)
+ thd->variables.progress_report_time && !thd->is_error())
+ {
net_send_progress_packet(thd);
+ if (thd->is_error())
+ thd->clear_error();
+ }
}
}
@@ -4012,11 +4130,15 @@ extern "C" unsigned long thd_get_thread_id(const MYSQL_THD thd)
return((unsigned long)thd->thread_id);
}
-extern "C" enum_tx_isolation thd_get_trx_isolation(const MYSQL_THD thd)
+/**
+ Check if THD socket is still connected.
+ */
+extern "C" int thd_is_connected(MYSQL_THD thd)
{
- return thd->tx_isolation;
+ return thd->is_connected();
}
+
#ifdef INNODB_COMPATIBILITY_HOOKS
extern "C" const struct charset_info_st *thd_charset(MYSQL_THD thd)
{
@@ -4048,6 +4170,230 @@ extern "C" int thd_slave_thread(const MYSQL_THD thd)
return(thd->slave_thread);
}
+/* Returns true for a worker thread in parallel replication. */
+extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
+{
+ return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
+}
+
+/*
+ This function can optionally be called to check if thd_report_wait_for()
+ needs to be called for waits done by a given transaction.
+
+ If this function returns false for a given thd, there is no need to do any
+ calls to thd_report_wait_for() on that thd.
+
+ This call is optional; it is safe to call thd_report_wait_for() in any case.
+ This call can be used to save some redundant calls to thd_report_wait_for()
+ if desired. (This is unlikely to matter much unless there are _lots_ of
+ waits to report, as the overhead of thd_report_wait_for() is small).
+*/
+extern "C" int
+thd_need_wait_for(const MYSQL_THD thd)
+{
+ rpl_group_info *rgi;
+
+ if (mysql_bin_log.is_open() && opt_binlog_commit_wait_count > 0)
+ return true;
+ if (!thd)
+ return false;
+ rgi= thd->rgi_slave;
+ if (!rgi)
+ return false;
+ return rgi->is_parallel_exec;
+}
+
+/*
+ Used by InnoDB/XtraDB to report that one transaction THD is about to go to
+ wait for a transactional lock held by another transactions OTHER_THD.
+
+ This is used for parallel replication, where transactions are required to
+ commit in the same order on the slave as they did on the master. If the
+ transactions on the slave encounters lock conflicts on the slave that did
+ not exist on the master, this can cause deadlocks.
+
+ Normally, such conflicts will not occur, because the same conflict would
+ have prevented the two transactions from committing in parallel on the
+ master, thus preventing them from running in parallel on the slave in the
+ first place. However, it is possible in case when the optimizer chooses a
+ different plan on the slave than on the master (eg. table scan instead of
+ index scan).
+
+ InnoDB/XtraDB reports lock waits using this call. If a lock wait causes a
+ deadlock with the pre-determined commit order, we kill the later transaction,
+ and later re-try it, to resolve the deadlock.
+
+ This call need only receive reports about waits for locks that will remain
+ until the holding transaction commits. InnoDB/XtraDB auto-increment locks
+ are released earlier, and so need not be reported. (Such false positives are
+ not harmful, but could lead to unnecessary kill and retry, so best avoided).
+*/
+extern "C" void
+thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd)
+{
+ rpl_group_info *rgi;
+ rpl_group_info *other_rgi;
+
+ if (!thd || !other_thd)
+ return;
+ binlog_report_wait_for(thd, other_thd);
+ rgi= thd->rgi_slave;
+ other_rgi= other_thd->rgi_slave;
+ if (!rgi || !other_rgi)
+ return;
+ if (!rgi->is_parallel_exec)
+ return;
+ if (rgi->rli != other_rgi->rli)
+ return;
+ if (!rgi->gtid_sub_id || !other_rgi->gtid_sub_id)
+ return;
+ if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
+ return;
+ if (rgi->gtid_sub_id > other_rgi->gtid_sub_id)
+ return;
+ /*
+ This transaction is about to wait for another transaction that is required
+ by replication binlog order to commit after. This would cause a deadlock.
+
+ So send a kill to the other transaction, with a temporary error; this will
+ cause replication to rollback (and later re-try) the other transaction,
+ releasing the lock for this transaction so replication can proceed.
+ */
+ other_rgi->killed_for_retry= true;
+ mysql_mutex_lock(&other_thd->LOCK_thd_data);
+ other_thd->awake(KILL_CONNECTION);
+ mysql_mutex_unlock(&other_thd->LOCK_thd_data);
+}
+
+/*
+ This function is called from InnoDB/XtraDB to check if the commit order of
+ two transactions has already been decided by the upper layer. This happens
+ in parallel replication, where the commit order is forced to be the same on
+ the slave as it was originally on the master.
+
+ If this function returns false, it means that such commit order will be
+ enforced. This allows the storage engine to optionally omit gap lock waits
+ or similar measures that would otherwise be needed to ensure that
+ transactions would be serialised in a way that would cause a commit order
+ that is correct for binlogging for statement-based replication.
+
+ Since transactions are only run in parallel on the slave if they ran without
+ lock conflicts on the master, normally no lock conflicts on the slave happen
+ during parallel replication. However, there are a couple of corner cases
+ where it can happen, like these secondary-index operations:
+
+ T1: INSERT INTO t1 VALUES (7, NULL);
+ T2: DELETE FROM t1 WHERE b <= 3;
+
+ T1: UPDATE t1 SET secondary=NULL WHERE primary=1
+ T2: DELETE t1 WHERE secondary <= 3
+
+ The DELETE takes a gap lock that can block the INSERT/UPDATE, but the row
+ locks set by INSERT/UPDATE do not block the DELETE. Thus, the execution
+ order of the transactions determine whether a lock conflict occurs or
+ not. Thus a lock conflict can occur on the slave where it did not on the
+ master.
+
+ If this function returns true, normal locking should be done as required by
+ the binlogging and transaction isolation level in effect. But if it returns
+ false, the correct order will be enforced anyway, and InnoDB/XtraDB can
+ avoid taking the gap lock, preventing the lock conflict.
+
+ Calling this function is just an optimisation to avoid unnecessary
+ deadlocks. If it was not used, a gap lock would be set that could eventually
+ cause a deadlock; the deadlock would be caught by thd_report_wait_for() and
+ the transaction T2 killed and rolled back (and later re-tried).
+*/
+extern "C" int
+thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
+{
+ rpl_group_info *rgi, *other_rgi;
+
+ DBUG_EXECUTE_IF("disable_thd_need_ordering_with", return 1;);
+ if (!thd || !other_thd)
+ return 1;
+ rgi= thd->rgi_slave;
+ other_rgi= other_thd->rgi_slave;
+ if (!rgi || !other_rgi)
+ return 1;
+ if (!rgi->is_parallel_exec)
+ return 1;
+ if (rgi->rli != other_rgi->rli)
+ return 1;
+ if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
+ return 1;
+ if (!rgi->commit_id || rgi->commit_id != other_rgi->commit_id)
+ return 1;
+ DBUG_EXECUTE_IF("thd_need_ordering_with_force", return 1;);
+ /*
+ Otherwise, these two threads are doing parallel replication within the same
+ replication domain. Their commit order is already fixed, so we do not need
+ gap locks or similar to otherwise enforce ordering (and in fact such locks
+ could lead to unnecessary deadlocks and transaction retry).
+ */
+ return 0;
+}
+
+
+/*
+ If the storage engine detects a deadlock, and needs to choose a victim
+ transaction to roll back, it can call this function to ask the upper
+ server layer for which of two possible transactions is prefered to be
+ aborted and rolled back.
+
+ In parallel replication, if two transactions are running in parallel and
+ one is fixed to commit before the other, then the one that commits later
+ will be prefered as the victim - chosing the early transaction as a victim
+ will not resolve the deadlock anyway, as the later transaction still needs
+ to wait for the earlier to commit.
+
+ Otherwise, a transaction that uses only transactional tables, and can thus
+ be safely rolled back, will be prefered as a deadlock victim over a
+ transaction that also modified non-transactional (eg. MyISAM) tables.
+
+ The return value is -1 if the first transaction is prefered as a deadlock
+ victim, 1 if the second transaction is prefered, or 0 for no preference (in
+ which case the storage engine can make the choice as it prefers).
+*/
+extern "C" int
+thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2)
+{
+ rpl_group_info *rgi1, *rgi2;
+ bool nontrans1, nontrans2;
+
+ if (!thd1 || !thd2)
+ return 0;
+
+ /*
+ If the transactions are participating in the same replication domain in
+ parallel replication, then request to select the one that will commit
+ later (in the fixed commit order from the master) as the deadlock victim.
+ */
+ rgi1= thd1->rgi_slave;
+ rgi2= thd2->rgi_slave;
+ if (rgi1 && rgi2 &&
+ rgi1->is_parallel_exec &&
+ rgi1->rli == rgi2->rli &&
+ rgi1->current_gtid.domain_id == rgi2->current_gtid.domain_id)
+ return rgi1->gtid_sub_id < rgi2->gtid_sub_id ? 1 : -1;
+
+ /*
+ If one transaction has modified non-transactional tables (so that it
+ cannot be safely rolled back), and the other has not, then prefer to
+ select the purely transactional one as the victim.
+ */
+ nontrans1= thd1->transaction.all.modified_non_trans_table;
+ nontrans2= thd2->transaction.all.modified_non_trans_table;
+ if (nontrans1 && !nontrans2)
+ return 1;
+ else if (!nontrans1 && nontrans2)
+ return -1;
+
+ /* No preferences, let the storage engine decide. */
+ return 0;
+}
+
+
extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
{
return(thd->transaction.all.modified_non_trans_table);
@@ -4072,11 +4418,56 @@ extern "C" bool thd_binlog_filter_ok(const MYSQL_THD thd)
return binlog_filter->db_ok(thd->db);
}
+/*
+ This is similar to sqlcom_can_generate_row_events, with the expection
+ that we only return 1 if we are going to generate row events in a
+ transaction.
+ CREATE OR REPLACE is always safe to do as this will run in it's own
+ transaction.
+*/
+
extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd)
{
- return sqlcom_can_generate_row_events(thd);
+ return (sqlcom_can_generate_row_events(thd) && thd->lex->sql_command !=
+ SQLCOM_CREATE_TABLE);
+}
+
+
+extern "C" enum durability_properties thd_get_durability_property(const MYSQL_THD thd)
+{
+ enum durability_properties ret= HA_REGULAR_DURABILITY;
+
+ if (thd != NULL)
+ ret= thd->durability_property;
+
+ return ret;
}
+/** Get the auto_increment_offset auto_increment_increment.
+Exposed by thd_autoinc_service.
+Needed by InnoDB.
+@param thd Thread object
+@param off auto_increment_offset
+@param inc auto_increment_increment */
+extern "C" void thd_get_autoinc(const MYSQL_THD thd, ulong* off, ulong* inc)
+{
+ *off = thd->variables.auto_increment_offset;
+ *inc = thd->variables.auto_increment_increment;
+}
+
+
+/**
+ Is strict sql_mode set.
+ Needed by InnoDB.
+ @param thd Thread object
+ @return True if sql_mode has strict mode (all or trans).
+ @retval true sql_mode has strict mode (all or trans).
+ @retval false sql_mode has not strict mode (all or trans).
+*/
+extern "C" bool thd_is_strict_mode(const MYSQL_THD thd)
+{
+ return thd->is_strict_mode();
+}
/*
@@ -4182,8 +4573,8 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup,
backup->enable_slow_log= enable_slow_log;
backup->query_plan_flags= query_plan_flags;
backup->limit_found_rows= limit_found_rows;
- backup->examined_row_count= examined_row_count;
- backup->sent_row_count= sent_row_count;
+ backup->examined_row_count= m_examined_row_count;
+ backup->sent_row_count= m_sent_row_count;
backup->cuted_fields= cuted_fields;
backup->client_capabilities= client_capabilities;
backup->savepoints= transaction.savepoints;
@@ -4206,8 +4597,8 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup,
/* Disable result sets */
client_capabilities &= ~CLIENT_MULTI_RESULTS;
in_sub_stmt|= new_state;
- examined_row_count= 0;
- sent_row_count= 0;
+ m_examined_row_count= 0;
+ m_sent_row_count= 0;
cuted_fields= 0;
transaction.savepoints= 0;
first_successful_insert_id_in_cur_stmt= 0;
@@ -4254,7 +4645,7 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup)
first_successful_insert_id_in_cur_stmt=
backup->first_successful_insert_id_in_cur_stmt;
limit_found_rows= backup->limit_found_rows;
- sent_row_count= backup->sent_row_count;
+ set_sent_row_count(backup->sent_row_count);
client_capabilities= backup->client_capabilities;
/*
If we've left sub-statement mode, reset the fatal error flag.
@@ -4275,7 +4666,7 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup)
The following is added to the old values as we are interested in the
total complexity of the query
*/
- examined_row_count+= backup->examined_row_count;
+ inc_examined_row_count(backup->examined_row_count);
cuted_fields+= backup->cuted_fields;
DBUG_VOID_RETURN;
}
@@ -4288,6 +4679,141 @@ void THD::set_statement(Statement *stmt)
mysql_mutex_unlock(&LOCK_thd_data);
}
+void THD::set_sent_row_count(ha_rows count)
+{
+ m_sent_row_count= count;
+ MYSQL_SET_STATEMENT_ROWS_SENT(m_statement_psi, m_sent_row_count);
+}
+
+void THD::set_examined_row_count(ha_rows count)
+{
+ m_examined_row_count= count;
+ MYSQL_SET_STATEMENT_ROWS_EXAMINED(m_statement_psi, m_examined_row_count);
+}
+
+void THD::inc_sent_row_count(ha_rows count)
+{
+ m_sent_row_count+= count;
+ MYSQL_SET_STATEMENT_ROWS_SENT(m_statement_psi, m_sent_row_count);
+}
+
+void THD::inc_examined_row_count(ha_rows count)
+{
+ m_examined_row_count+= count;
+ MYSQL_SET_STATEMENT_ROWS_EXAMINED(m_statement_psi, m_examined_row_count);
+}
+
+void THD::inc_status_created_tmp_disk_tables()
+{
+ status_var_increment(status_var.created_tmp_disk_tables_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_created_tmp_disk_tables)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_created_tmp_tables()
+{
+ status_var_increment(status_var.created_tmp_tables_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_created_tmp_tables)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_select_full_join()
+{
+ status_var_increment(status_var.select_full_join_count_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_select_full_join)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_select_full_range_join()
+{
+ status_var_increment(status_var.select_full_range_join_count_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_select_full_range_join)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_select_range()
+{
+ status_var_increment(status_var.select_range_count_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_select_range)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_select_range_check()
+{
+ status_var_increment(status_var.select_range_check_count_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_select_range_check)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_select_scan()
+{
+ status_var_increment(status_var.select_scan_count_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_select_scan)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_sort_merge_passes()
+{
+ status_var_increment(status_var.filesort_merge_passes_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_sort_merge_passes)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_sort_range()
+{
+ status_var_increment(status_var.filesort_range_count_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_sort_range)(m_statement_psi, 1);
+#endif
+}
+
+void THD::inc_status_sort_rows(ha_rows count)
+{
+ statistic_add(status_var.filesort_rows_, count, &LOCK_status);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_sort_rows)(m_statement_psi, count);
+#endif
+}
+
+void THD::inc_status_sort_scan()
+{
+ status_var_increment(status_var.filesort_scan_count_);
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(inc_statement_sort_scan)(m_statement_psi, 1);
+#endif
+}
+
+void THD::set_status_no_index_used()
+{
+ server_status|= SERVER_QUERY_NO_INDEX_USED;
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(set_statement_no_index_used)(m_statement_psi);
+#endif
+}
+
+void THD::set_status_no_good_index_used()
+{
+ server_status|= SERVER_QUERY_NO_GOOD_INDEX_USED;
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+ PSI_STATEMENT_CALL(set_statement_no_good_index_used)(m_statement_psi);
+#endif
+}
+
+void THD::set_command(enum enum_server_command command)
+{
+ m_command= command;
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_STATEMENT_CALL(set_thread_command)(m_command);
+#endif
+}
/** Assign a new value to thd->query. */
@@ -4296,6 +4822,10 @@ void THD::set_query(const CSET_STRING &string_arg)
mysql_mutex_lock(&LOCK_thd_data);
set_query_inner(string_arg);
mysql_mutex_unlock(&LOCK_thd_data);
+
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_THREAD_CALL(set_thread_info)(query(), query_length());
+#endif
}
/** Assign a new value to thd->query and thd->query_id. */
@@ -4306,17 +4836,8 @@ void THD::set_query_and_id(char *query_arg, uint32 query_length_arg,
{
mysql_mutex_lock(&LOCK_thd_data);
set_query_inner(query_arg, query_length_arg, cs);
- query_id= new_query_id;
mysql_mutex_unlock(&LOCK_thd_data);
-}
-
-/** Assign a new value to thd->query_id. */
-
-void THD::set_query_id(query_id_t new_query_id)
-{
- mysql_mutex_lock(&LOCK_thd_data);
query_id= new_query_id;
- mysql_mutex_unlock(&LOCK_thd_data);
}
/** Assign a new value to thd->mysys_var. */
@@ -4350,13 +4871,15 @@ void THD::leave_locked_tables_mode()
/* Also ensure that we don't release metadata locks for open HANDLERs. */
if (handler_tables_hash.records)
mysql_ha_set_explicit_lock_duration(this);
+ if (ull_hash.records)
+ mysql_ull_set_explicit_lock_duration(this);
}
locked_tables_mode= LTM_NONE;
}
-void THD::get_definer(LEX_USER *definer)
+void THD::get_definer(LEX_USER *definer, bool role)
{
- binlog_invoker();
+ binlog_invoker(role);
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
if (slave_thread && has_invoker())
{
@@ -4368,7 +4891,7 @@ void THD::get_definer(LEX_USER *definer)
}
else
#endif
- get_default_definer(this, definer);
+ get_default_definer(this, definer, role);
}
@@ -4561,7 +5084,8 @@ has_write_table_with_auto_increment_and_select(TABLE_LIST *tables)
for(TABLE_LIST *table= tables; table; table= table->next_global)
{
if (!table->placeholder() &&
- (table->lock_type <= TL_READ_NO_INSERT))
+ table->lock_type <= TL_READ_NO_INSERT &&
+ table->prelocking_placeholder != TABLE_LIST::FK)
{
has_select= true;
break;
@@ -4667,16 +5191,13 @@ has_write_table_auto_increment_not_first_in_pk(TABLE_LIST *tables)
BINLOG_FORMAT = STATEMENT and at least one table uses a storage
engine limited to row-logging.
- 6. Error: Cannot execute row injection: binlogging impossible since
- BINLOG_FORMAT = STATEMENT.
-
- 7. Warning: Unsafe statement binlogged in statement format since
+ 6. Warning: Unsafe statement binlogged in statement format since
BINLOG_FORMAT = STATEMENT.
In addition, we can produce the following error (not depending on
the variables of the decision diagram):
- 8. Error: Cannot execute statement: binlogging impossible since more
+ 7. Error: Cannot execute statement: binlogging impossible since more
than one engine is involved and at least one engine is
self-logging.
@@ -4703,6 +5224,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x",
lex->get_stmt_unsafe_flags()));
+ reset_binlog_local_stmt_filter();
+
/*
We should not decide logging format if the binlog is closed or
binlogging is off, or if the statement is filtered out from the
@@ -4745,6 +5268,28 @@ int THD::decide_logging_format(TABLE_LIST *tables)
A pointer to a previous table that was accessed.
*/
TABLE* prev_access_table= NULL;
+ /**
+ The number of tables used in the current statement,
+ that should be replicated.
+ */
+ uint replicated_tables_count= 0;
+ /**
+ The number of tables written to in the current statement,
+ that should not be replicated.
+ A table should not be replicated when it is considered
+ 'local' to a MySQL instance.
+ Currently, these tables are:
+ - mysql.slow_log
+ - mysql.general_log
+ - mysql.slave_relay_log_info
+ - mysql.slave_master_info
+ - mysql.slave_worker_info
+ - performance_schema.*
+ - TODO: information_schema.*
+ In practice, from this list, only performance_schema.* tables
+ are written to by user queries.
+ */
+ uint non_replicated_tables_count= 0;
#ifndef DBUG_OFF
{
@@ -4794,14 +5339,38 @@ int THD::decide_logging_format(TABLE_LIST *tables)
if (table->placeholder())
continue;
- if (table->table->s->table_category == TABLE_CATEGORY_PERFORMANCE ||
- table->table->s->table_category == TABLE_CATEGORY_LOG)
- lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE);
-
handler::Table_flags const flags= table->table->file->ha_table_flags();
DBUG_PRINT("info", ("table: %s; ha_table_flags: 0x%llx",
table->table_name, flags));
+
+ if (table->table->no_replicate)
+ {
+ /*
+ The statement uses a table that is not replicated.
+ The following properties about the table:
+ - persistent / transient
+ - transactional / non transactional
+ - temporary / permanent
+ - read or write
+ - multiple engines involved because of this table
+ are not relevant, as this table is completely ignored.
+ Because the statement uses a non replicated table,
+ using STATEMENT format in the binlog is impossible.
+ Either this statement will be discarded entirely,
+ or it will be logged (possibly partially) in ROW format.
+ */
+ lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_TABLE);
+
+ if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
+ {
+ non_replicated_tables_count++;
+ continue;
+ }
+ }
+
+ replicated_tables_count++;
+
if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
{
if (prev_write_table && prev_write_table->file->ht !=
@@ -4823,32 +5392,12 @@ int THD::decide_logging_format(TABLE_LIST *tables)
prev_write_table= table->table;
- /*
- INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
- can be unsafe. Check for it if the flag is already not marked for the
- given statement.
- */
- if (!lex->is_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS) &&
- lex->sql_command == SQLCOM_INSERT &&
- /* Duplicate key update is not supported by INSERT DELAYED */
- command != COM_DELAYED_INSERT && lex->duplicates == DUP_UPDATE)
- {
- uint keys= table->table->s->keys, i= 0, unique_keys= 0;
- for (KEY* keyinfo= table->table->s->key_info;
- i < keys && unique_keys <= 1; i++, keyinfo++)
- {
- if (keyinfo->flags & HA_NOSAME)
- unique_keys++;
- }
- if (unique_keys > 1 )
- lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS);
- }
}
flags_access_some_set |= flags;
if (lex->sql_command != SQLCOM_CREATE_TABLE ||
(lex->sql_command == SQLCOM_CREATE_TABLE &&
- (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE)))
+ lex->create_info.tmp_table()))
{
my_bool trans= table->table->file->has_transactions();
@@ -4954,10 +5503,10 @@ int THD::decide_logging_format(TABLE_LIST *tables)
if (lex->is_stmt_row_injection())
{
/*
- 6. Error: Cannot execute row injection since
- BINLOG_FORMAT = STATEMENT
+ We have to log the statement as row or give an error.
+ Better to accept what master gives us than stopping replication.
*/
- my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_MODE), MYF(0));
+ set_current_stmt_binlog_format_row();
}
else if ((flags_write_all_set & HA_BINLOG_STMT_CAPABLE) == 0 &&
sqlcom_can_generate_row_events(this))
@@ -4982,7 +5531,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
DBUG_PRINT("info", ("binlog_unsafe_warning_flags: 0x%x",
binlog_unsafe_warning_flags));
}
- /* log in statement format! */
+ /* log in statement format (or row if row event)! */
}
/* No statement-only engines and binlog_format != STATEMENT.
I.e., nothing prevents us from row logging if needed. */
@@ -4997,6 +5546,30 @@ int THD::decide_logging_format(TABLE_LIST *tables)
}
}
+ if (non_replicated_tables_count > 0)
+ {
+ if ((replicated_tables_count == 0) || ! is_write)
+ {
+ DBUG_PRINT("info", ("decision: no logging, no replicated table affected"));
+ set_binlog_local_stmt_filter();
+ }
+ else
+ {
+ if (! is_current_stmt_binlog_format_row())
+ {
+ my_error((error= ER_BINLOG_STMT_MODE_AND_NO_REPL_TABLES), MYF(0));
+ }
+ else
+ {
+ clear_binlog_local_stmt_filter();
+ }
+ }
+ }
+ else
+ {
+ clear_binlog_local_stmt_filter();
+ }
+
if (error) {
DBUG_PRINT("info", ("decision: no logging since an error was generated"));
DBUG_RETURN(-1);
@@ -5035,7 +5608,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
Replace the last ',' with '.' for table_names
*/
table_names.replace(table_names.length()-1, 1, ".", 1);
- push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(this, Sql_condition::WARN_LEVEL_WARN,
ER_UNKNOWN_ERROR,
"Row events are not logged for %s statements "
"that modify BLACKHOLE tables in row format. "
@@ -5104,7 +5677,11 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
DBUG_ASSERT(table->s->table_map_id != ~0UL);
/* Fetch the type code for the RowsEventT template parameter */
- int const type_code= RowsEventT::TYPE_CODE;
+ int const general_type_code= RowsEventT::TYPE_CODE;
+
+ /* Ensure that all events in a GTID group are in the same cache */
+ if (variables.option_bits & OPTION_GTID_BEGIN)
+ is_transactional= 1;
/*
There is no good place to set up the transactional data, so we
@@ -5131,7 +5708,7 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
if (!pending ||
pending->server_id != serv_id ||
pending->get_table_id() != table->s->table_map_id ||
- pending->get_type_code() != type_code ||
+ pending->get_general_type_code() != general_type_code ||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
pending->get_width() != colcnt ||
!bitmap_cmp(pending->get_cols(), cols))
@@ -5160,27 +5737,6 @@ THD::binlog_prepare_pending_rows_event(TABLE* table, uint32 serv_id,
DBUG_RETURN(pending); /* This is the current pending event */
}
-#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
-/*
- Instantiate the versions we need, we have -fno-implicit-template as
- compiling option.
-*/
-template Rows_log_event*
-THD::binlog_prepare_pending_rows_event(TABLE*, uint32, MY_BITMAP const*,
- size_t, size_t, bool,
- Write_rows_log_event*);
-
-template Rows_log_event*
-THD::binlog_prepare_pending_rows_event(TABLE*, uint32, MY_BITMAP const*,
- size_t colcnt, size_t, bool,
- Delete_rows_log_event *);
-
-template Rows_log_event*
-THD::binlog_prepare_pending_rows_event(TABLE*, uint32, MY_BITMAP const*,
- size_t colcnt, size_t, bool,
- Update_rows_log_event *);
-#endif
-
/* Declare in unnamed namespace. */
CPP_UNNAMED_NS_START
/**
@@ -5322,8 +5878,12 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
size_t const len= pack_row(table, cols, row_data, record);
+ /* Ensure that all events in a GTID group are in the same cache */
+ if (variables.option_bits & OPTION_GTID_BEGIN)
+ is_trans= 1;
+
Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, server_id, cols, colcnt,
+ binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
len, is_trans,
static_cast<Write_rows_log_event*>(0));
@@ -5355,6 +5915,10 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
size_t const after_size= pack_row(table, cols, after_row,
after_record);
+ /* Ensure that all events in a GTID group are in the same cache */
+ if (variables.option_bits & OPTION_GTID_BEGIN)
+ is_trans= 1;
+
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
@@ -5367,7 +5931,7 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
#endif
Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, server_id, cols, colcnt,
+ binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
before_size + after_size, is_trans,
static_cast<Update_rows_log_event*>(0));
@@ -5397,8 +5961,12 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
size_t const len= pack_row(table, cols, row_data, record);
+ /* Ensure that all events in a GTID group are in the same cache */
+ if (variables.option_bits & OPTION_GTID_BEGIN)
+ is_trans= 1;
+
Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, server_id, cols, colcnt,
+ binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
len, is_trans,
static_cast<Delete_rows_log_event*>(0));
@@ -5417,6 +5985,10 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps,
if (!mysql_bin_log.is_open())
DBUG_RETURN(0);
+ /* Ensure that all events in a GTID group are in the same cache */
+ if (variables.option_bits & OPTION_GTID_BEGIN)
+ is_transactional= 1;
+
mysql_bin_log.remove_pending_rows_event(this, is_transactional);
if (clear_maps)
@@ -5436,6 +6008,10 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
if (!mysql_bin_log.is_open())
DBUG_RETURN(0);
+ /* Ensure that all events in a GTID group are in the same cache */
+ if (variables.option_bits & OPTION_GTID_BEGIN)
+ is_transactional= 1;
+
/*
Mark the event as the last event of a statement if the stmt_end
flag is set.
@@ -5480,23 +6056,35 @@ show_query_type(THD::enum_binlog_query_type qtype)
Constants required for the limit unsafe warnings suppression
*/
//seconds after which the limit unsafe warnings suppression will be activated
-#define LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT 50
+#define LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT 5*60
//number of limit unsafe warnings after which the suppression will be activated
-#define LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT 50
+#define LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT 10
-static ulonglong limit_unsafe_suppression_start_time= 0;
-static bool unsafe_warning_suppression_is_activated= false;
-static int limit_unsafe_warning_count= 0;
+static ulonglong unsafe_suppression_start_time= 0;
+static bool unsafe_warning_suppression_active[LEX::BINLOG_STMT_UNSAFE_COUNT];
+static ulong unsafe_warnings_count[LEX::BINLOG_STMT_UNSAFE_COUNT];
+static ulong total_unsafe_warnings_count;
/**
Auxiliary function to reset the limit unsafety warning suppression.
+ This is done without mutex protection, but this should be good
+ enough as it doesn't matter if we loose a couple of suppressed
+ messages or if this is called multiple times.
*/
-static void reset_binlog_unsafe_suppression()
+
+static void reset_binlog_unsafe_suppression(ulonglong now)
{
+ uint i;
DBUG_ENTER("reset_binlog_unsafe_suppression");
- unsafe_warning_suppression_is_activated= false;
- limit_unsafe_warning_count= 0;
- limit_unsafe_suppression_start_time= my_interval_timer()/10000000;
+
+ unsafe_suppression_start_time= now;
+ total_unsafe_warnings_count= 0;
+
+ for (i= 0 ; i < LEX::BINLOG_STMT_UNSAFE_COUNT ; i++)
+ {
+ unsafe_warnings_count[i]= 0;
+ unsafe_warning_suppression_active[i]= 0;
+ }
DBUG_VOID_RETURN;
}
@@ -5514,95 +6102,94 @@ static void print_unsafe_warning_to_log(int unsafe_type, char* buf,
}
/**
- Auxiliary function to check if the warning for limit unsafety should be
- thrown or suppressed. Details of the implementation can be found in the
- comments inline.
+ Auxiliary function to check if the warning for unsafe repliction statements
+ should be thrown or suppressed.
+
+ Logic is:
+ - If we get more than LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT errors
+ of one type, that type of errors will be suppressed for
+ LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT.
+ - When the time limit has been reached, all suppression is reset.
+
+ This means that if one gets many different types of errors, some of them
+ may be reset less than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT. However at
+ least one error is disable for this time.
+
SYNOPSIS:
@params
- buf - buffer to hold the warning message text
unsafe_type - The type of unsafety.
- query - The actual query statement.
- TODO: Remove this function and implement a general service for all warnings
- that would prevent flooding the error log.
+ RETURN:
+ 0 0k to log
+ 1 Message suppressed
*/
-static void do_unsafe_limit_checkout(char* buf, int unsafe_type, char* query)
+
+static bool protect_against_unsafe_warning_flood(int unsafe_type)
{
- ulonglong now= 0;
- DBUG_ENTER("do_unsafe_limit_checkout");
- DBUG_ASSERT(unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT);
- limit_unsafe_warning_count++;
+ ulong count;
+ ulonglong now= my_interval_timer()/1000000000ULL;
+ DBUG_ENTER("protect_against_unsafe_warning_flood");
+
+ count= ++unsafe_warnings_count[unsafe_type];
+ total_unsafe_warnings_count++;
+
/*
INITIALIZING:
If this is the first time this function is called with log warning
enabled, the monitoring the unsafe warnings should start.
*/
- if (limit_unsafe_suppression_start_time == 0)
+ if (unsafe_suppression_start_time == 0)
{
- limit_unsafe_suppression_start_time= my_interval_timer()/10000000;
- print_unsafe_warning_to_log(unsafe_type, buf, query);
+ reset_binlog_unsafe_suppression(now);
+ DBUG_RETURN(0);
}
- else
+
+ /*
+ The following is true if we got too many errors or if the error was
+ already suppressed
+ */
+ if (count >= LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT)
{
- if (!unsafe_warning_suppression_is_activated)
- print_unsafe_warning_to_log(unsafe_type, buf, query);
+ ulonglong diff_time= (now - unsafe_suppression_start_time);
- if (limit_unsafe_warning_count >=
- LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT)
+ if (!unsafe_warning_suppression_active[unsafe_type])
{
- now= my_interval_timer()/10000000;
- if (!unsafe_warning_suppression_is_activated)
+ /*
+ ACTIVATION:
+ We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT warnings in
+ less than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT we activate the
+ suppression.
+ */
+ if (diff_time <= LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
{
- /*
- ACTIVATION:
- We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT warnings in
- less than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT we activate the
- suppression.
- */
- if ((now-limit_unsafe_suppression_start_time) <=
- LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
- {
- unsafe_warning_suppression_is_activated= true;
- DBUG_PRINT("info",("A warning flood has been detected and the limit \
-unsafety warning suppression has been activated."));
- }
- else
- {
- /*
- there is no flooding till now, therefore we restart the monitoring
- */
- limit_unsafe_suppression_start_time= my_interval_timer()/10000000;
- limit_unsafe_warning_count= 0;
- }
+ unsafe_warning_suppression_active[unsafe_type]= 1;
+ sql_print_information("Suppressing warnings of type '%s' for up to %d seconds because of flooding",
+ ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]),
+ LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT);
}
else
{
/*
- Print the suppression note and the unsafe warning.
+ There is no flooding till now, therefore we restart the monitoring
*/
- sql_print_information("The following warning was suppressed %d times \
-during the last %d seconds in the error log",
- limit_unsafe_warning_count,
- (int)
- (now-limit_unsafe_suppression_start_time));
- print_unsafe_warning_to_log(unsafe_type, buf, query);
- /*
- DEACTIVATION: We got LIMIT_UNSAFE_WARNING_ACTIVATION_THRESHOLD_COUNT
- warnings in more than LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT, the
- suppression should be deactivated.
- */
- if ((now - limit_unsafe_suppression_start_time) >
- LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
- {
- reset_binlog_unsafe_suppression();
- DBUG_PRINT("info",("The limit unsafety warning supression has been \
-deactivated"));
- }
+ reset_binlog_unsafe_suppression(now);
+ }
+ }
+ else
+ {
+ /* This type of warnings was suppressed */
+ if (diff_time > LIMIT_UNSAFE_WARNING_ACTIVATION_TIMEOUT)
+ {
+ ulong save_count= total_unsafe_warnings_count;
+ /* Print a suppression note and remove the suppression */
+ reset_binlog_unsafe_suppression(now);
+ sql_print_information("Suppressed %lu unsafe warnings during "
+ "the last %d seconds",
+ save_count, (int) diff_time);
}
- limit_unsafe_warning_count= 0;
}
}
- DBUG_VOID_RETURN;
+ DBUG_RETURN(unsafe_warning_suppression_active[unsafe_type]);
}
/**
@@ -5614,6 +6201,7 @@ deactivated"));
void THD::issue_unsafe_warnings()
{
char buf[MYSQL_ERRMSG_SIZE * 2];
+ uint32 unsafe_type_flags;
DBUG_ENTER("issue_unsafe_warnings");
/*
Ensure that binlog_unsafe_warning_flags is big enough to hold all
@@ -5621,8 +6209,10 @@ void THD::issue_unsafe_warnings()
*/
DBUG_ASSERT(LEX::BINLOG_STMT_UNSAFE_COUNT <=
sizeof(binlog_unsafe_warning_flags) * CHAR_BIT);
-
- uint32 unsafe_type_flags= binlog_unsafe_warning_flags;
+
+ if (!(unsafe_type_flags= binlog_unsafe_warning_flags))
+ DBUG_VOID_RETURN; // Nothing to do
+
/*
For each unsafe_type, check if the statement is unsafe in this way
and issue a warning.
@@ -5633,17 +6223,13 @@ void THD::issue_unsafe_warnings()
{
if ((unsafe_type_flags & (1 << unsafe_type)) != 0)
{
- push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_NOTE,
+ push_warning_printf(this, Sql_condition::WARN_LEVEL_NOTE,
ER_BINLOG_UNSAFE_STATEMENT,
ER(ER_BINLOG_UNSAFE_STATEMENT),
ER(LEX::binlog_stmt_unsafe_errcode[unsafe_type]));
- if (global_system_variables.log_warnings)
- {
- if (unsafe_type == LEX::BINLOG_STMT_UNSAFE_LIMIT)
- do_unsafe_limit_checkout( buf, unsafe_type, query());
- else //cases other than LIMIT unsafety
- print_unsafe_warning_to_log(unsafe_type, buf, query());
- }
+ if (global_system_variables.log_warnings > 0 &&
+ !protect_against_unsafe_warning_flood(unsafe_type))
+ print_unsafe_warning_to_log(unsafe_type, buf, query());
}
}
DBUG_VOID_RETURN;
@@ -5683,6 +6269,23 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
show_query_type(qtype), (int) query_len, query_arg));
DBUG_ASSERT(query_arg && mysql_bin_log.is_open());
+ /* If this is withing a BEGIN ... COMMIT group, don't log it */
+ if (variables.option_bits & OPTION_GTID_BEGIN)
+ {
+ direct= 0;
+ is_trans= 1;
+ }
+ DBUG_PRINT("info", ("is_trans: %d direct: %d", is_trans, direct));
+
+ if (get_binlog_local_stmt_filter() == BINLOG_FILTER_SET)
+ {
+ /*
+ The current statement is to be ignored, and not written to
+ the binlog. Do not call issue_unsafe_warnings().
+ */
+ DBUG_RETURN(0);
+ }
+
/*
If we are not in prelocked mode, mysql_unlock_tables() will be
called after this binlog_query(), so we have to flush the pending
@@ -5794,6 +6397,349 @@ THD::signal_wakeup_ready()
}
+void THD::rgi_lock_temporary_tables()
+{
+ mysql_mutex_lock(&rgi_slave->rli->data_lock);
+ temporary_tables= rgi_slave->rli->save_temporary_tables;
+}
+
+void THD::rgi_unlock_temporary_tables(bool clear)
+{
+ rgi_slave->rli->save_temporary_tables= temporary_tables;
+ mysql_mutex_unlock(&rgi_slave->rli->data_lock);
+ if (clear)
+ {
+ /*
+ Temporary tables are shared with other by sql execution threads.
+ As a safety messure, clear the pointer to the common area.
+ */
+ temporary_tables= 0;
+ }
+}
+
+bool THD::rgi_have_temporary_tables()
+{
+ return rgi_slave->rli->save_temporary_tables != 0;
+}
+
+
+void
+wait_for_commit::reinit()
+{
+ subsequent_commits_list= NULL;
+ next_subsequent_commit= NULL;
+ waitee= NULL;
+ opaque_pointer= NULL;
+ wakeup_error= 0;
+ wakeup_subsequent_commits_running= false;
+ commit_started= false;
+#ifdef SAFE_MUTEX
+ /*
+ When using SAFE_MUTEX, the ordering between taking the LOCK_wait_commit
+ mutexes is checked. This causes a problem when we re-use a mutex, as then
+ the expected locking order may change.
+
+ So in this case, do a re-init of the mutex. In release builds, we want to
+ avoid the overhead of a re-init though.
+
+ To ensure that no one is locking the mutex, we take a lock of it first.
+ For full explanation, see wait_for_commit::~wait_for_commit()
+ */
+ mysql_mutex_lock(&LOCK_wait_commit);
+ mysql_mutex_unlock(&LOCK_wait_commit);
+
+ mysql_mutex_destroy(&LOCK_wait_commit);
+ mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
+#endif
+}
+
+
+wait_for_commit::wait_for_commit()
+{
+ mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+ reinit();
+}
+
+
+wait_for_commit::~wait_for_commit()
+{
+ /*
+ Since we do a dirty read of the waiting_for_commit flag in
+ wait_for_prior_commit() and in unregister_wait_for_prior_commit(), we need
+ to take extra care before freeing the wait_for_commit object.
+
+ It is possible for the waitee to be pre-empted inside wakeup(), just after
+ it has cleared the waiting_for_commit flag and before it has released the
+ LOCK_wait_commit mutex. And then it is possible for the waiter to find the
+ flag cleared in wait_for_prior_commit() and go finish up things and
+ de-allocate the LOCK_wait_commit and COND_wait_commit objects before the
+ waitee has time to be re-scheduled and finish unlocking the mutex and
+ signalling the condition. This would lead to the waitee accessing no
+ longer valid memory.
+
+ To prevent this, we do an extra lock/unlock of the mutex here before
+ deallocation; this makes certain that any waitee has completed wakeup()
+ first.
+ */
+ mysql_mutex_lock(&LOCK_wait_commit);
+ mysql_mutex_unlock(&LOCK_wait_commit);
+
+ mysql_mutex_destroy(&LOCK_wait_commit);
+ mysql_cond_destroy(&COND_wait_commit);
+}
+
+
+void
+wait_for_commit::wakeup(int wakeup_error)
+{
+ /*
+ We signal each waiter on their own condition and mutex (rather than using
+ pthread_cond_broadcast() or something like that).
+
+ Otherwise we would need to somehow ensure that they were done
+ waking up before we could allow this THD to be destroyed, which would
+ be annoying and unnecessary.
+
+ Note that wakeup_subsequent_commits2() depends on this function being a
+ full memory barrier (it is, because it takes a mutex lock).
+
+ */
+ mysql_mutex_lock(&LOCK_wait_commit);
+ waitee= NULL;
+ this->wakeup_error= wakeup_error;
+ /*
+ Note that it is critical that the mysql_cond_signal() here is done while
+ still holding the mutex. As soon as we release the mutex, the waiter might
+ deallocate the condition object.
+ */
+ mysql_cond_signal(&COND_wait_commit);
+ mysql_mutex_unlock(&LOCK_wait_commit);
+}
+
+
+/*
+ Register that the next commit of this THD should wait to complete until
+ commit in another THD (the waitee) has completed.
+
+ The wait may occur explicitly, with the waiter sitting in
+ wait_for_prior_commit() until the waitee calls wakeup_subsequent_commits().
+
+ Alternatively, the TC (eg. binlog) may do the commits of both waitee and
+ waiter at once during group commit, resolving both of them in the right
+ order.
+
+ Only one waitee can be registered for a waiter; it must be removed by
+ wait_for_prior_commit() or unregister_wait_for_prior_commit() before a new
+ one is registered. But it is ok for several waiters to register a wait for
+ the same waitee. It is also permissible for one THD to be both a waiter and
+ a waitee at the same time.
+*/
+void
+wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
+{
+ DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
+ wakeup_error= 0;
+ this->waitee= waitee;
+
+ mysql_mutex_lock(&waitee->LOCK_wait_commit);
+ /*
+ If waitee is in the middle of wakeup, then there is nothing to wait for,
+ so we need not register. This is necessary to avoid a race in unregister,
+ see comments on wakeup_subsequent_commits2() for details.
+ */
+ if (waitee->wakeup_subsequent_commits_running)
+ this->waitee= NULL;
+ else
+ {
+ /*
+ Put ourself at the head of the waitee's list of transactions that must
+ wait for it to commit first.
+ */
+ this->next_subsequent_commit= waitee->subsequent_commits_list;
+ waitee->subsequent_commits_list= this;
+ }
+ mysql_mutex_unlock(&waitee->LOCK_wait_commit);
+}
+
+
+/*
+ Wait for commit of another transaction to complete, as already registered
+ with register_wait_for_prior_commit(). If the commit already completed,
+ returns immediately.
+*/
+int
+wait_for_commit::wait_for_prior_commit2(THD *thd)
+{
+ PSI_stage_info old_stage;
+ wait_for_commit *loc_waitee;
+
+ mysql_mutex_lock(&LOCK_wait_commit);
+ DEBUG_SYNC(thd, "wait_for_prior_commit_waiting");
+ thd->ENTER_COND(&COND_wait_commit, &LOCK_wait_commit,
+ &stage_waiting_for_prior_transaction_to_commit,
+ &old_stage);
+ while ((loc_waitee= this->waitee) && !thd->check_killed())
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ if (!loc_waitee)
+ {
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ goto end;
+ }
+ /*
+ Wait was interrupted by kill. We need to unregister our wait and give the
+ error. But if a wakeup is already in progress, then we must ignore the
+ kill and not give error, otherwise we get inconsistency between waitee and
+ waiter as to whether we succeed or fail (eg. we may roll back but waitee
+ might attempt to commit both us and any subsequent commits waiting for us).
+ */
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running)
+ {
+ /* We are being woken up; ignore the kill and just wait. */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ do
+ {
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ } while (this->waitee);
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ goto end;
+ }
+ remove_from_list(&loc_waitee->subsequent_commits_list);
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ this->waitee= NULL;
+
+ wakeup_error= thd->killed_errno();
+ if (!wakeup_error)
+ wakeup_error= ER_QUERY_INTERRUPTED;
+ my_message(wakeup_error, ER(wakeup_error), MYF(0));
+ thd->EXIT_COND(&old_stage);
+ /*
+ Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to
+ use within enter_cond/exit_cond.
+ */
+ DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
+ return wakeup_error;
+
+end:
+ thd->EXIT_COND(&old_stage);
+ return wakeup_error;
+}
+
+
+/*
+ Wakeup anyone waiting for us to have committed.
+
+ Note about locking:
+
+ We have a potential race or deadlock between wakeup_subsequent_commits() in
+ the waitee and unregister_wait_for_prior_commit() in the waiter.
+
+ Both waiter and waitee needs to take their own lock before it is safe to take
+ a lock on the other party - else the other party might disappear and invalid
+ memory data could be accessed. But if we take the two locks in different
+ order, we may end up in a deadlock.
+
+ The waiter needs to lock the waitee to delete itself from the list in
+ unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
+ hold its own lock while locking waiters, as this could lead to deadlock.
+
+ So we need to prevent unregister_wait_for_prior_commit() running while wakeup
+ is in progress - otherwise the unregister could complete before the wakeup,
+ leading to incorrect spurious wakeup or accessing invalid memory.
+
+ However, if we are in the middle of running wakeup_subsequent_commits(), then
+ there is no need for unregister_wait_for_prior_commit() in the first place -
+ the waiter can just do a normal wait_for_prior_commit(), as it will be
+ immediately woken up.
+
+ So the solution to the potential race/deadlock is to set a flag in the waitee
+ that wakeup_subsequent_commits() is in progress. When this flag is set,
+ unregister_wait_for_prior_commit() becomes just wait_for_prior_commit().
+
+ Then also register_wait_for_prior_commit() needs to check if
+ wakeup_subsequent_commits() is running, and skip the registration if
+ so. This is needed in case a new waiter manages to register itself and
+ immediately try to unregister while wakeup_subsequent_commits() is
+ running. Else the new waiter would also wait rather than unregister, but it
+ would not be woken up until next wakeup, which could be potentially much
+ later than necessary.
+*/
+
+void
+wait_for_commit::wakeup_subsequent_commits2(int wakeup_error)
+{
+ wait_for_commit *waiter;
+
+ mysql_mutex_lock(&LOCK_wait_commit);
+ wakeup_subsequent_commits_running= true;
+ waiter= subsequent_commits_list;
+ subsequent_commits_list= NULL;
+ mysql_mutex_unlock(&LOCK_wait_commit);
+
+ while (waiter)
+ {
+ /*
+ Important: we must grab the next pointer before waking up the waiter;
+ once the wakeup is done, the field could be invalidated at any time.
+ */
+ wait_for_commit *next= waiter->next_subsequent_commit;
+ waiter->wakeup(wakeup_error);
+ waiter= next;
+ }
+
+ /*
+ We need a full memory barrier between walking the list above, and clearing
+ the flag wakeup_subsequent_commits_running below. This barrier is needed
+ to ensure that no other thread will start to modify the list pointers
+ before we are done traversing the list.
+
+ But wait_for_commit::wakeup() does a full memory barrier already (it locks
+ a mutex), so no extra explicit barrier is needed here.
+ */
+ wakeup_subsequent_commits_running= false;
+ DBUG_EXECUTE_IF("inject_wakeup_subsequent_commits_sleep", my_sleep(21000););
+}
+
+
+/* Cancel a previously registered wait for another THD to commit before us. */
+void
+wait_for_commit::unregister_wait_for_prior_commit2()
+{
+ wait_for_commit *loc_waitee;
+
+ mysql_mutex_lock(&LOCK_wait_commit);
+ if ((loc_waitee= this->waitee))
+ {
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running)
+ {
+ /*
+ When a wakeup is running, we cannot safely remove ourselves from the
+ list without corrupting it. Instead we can just wait, as wakeup is
+ already in progress and will thus be immediate.
+
+ See comments on wakeup_subsequent_commits2() for more details.
+ */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ while (this->waitee)
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ }
+ else
+ {
+ /* Remove ourselves from the list in the waitee. */
+ remove_from_list(&loc_waitee->subsequent_commits_list);
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ this->waitee= NULL;
+ }
+ }
+ wakeup_error= 0;
+ mysql_mutex_unlock(&LOCK_wait_commit);
+}
+
+
bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
ulonglong incr)
{