summaryrefslogtreecommitdiff
path: root/sql/sql_class.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r--sql/sql_class.cc772
1 files changed, 555 insertions, 217 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 28bf77c94e8..883e9c688ff 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -35,7 +35,7 @@
#include "sql_base.h" // close_thread_tables
#include "sql_time.h" // date_time_format_copy
#include "tztime.h" // MYSQL_TIME <-> my_time_t
-#include "sql_acl.h" // NO_ACCESS,
+#include "sql_acl.h" // NO_ACL,
// acl_getroot_no_password
#include "sql_base.h"
#include "sql_handler.h" // mysql_ha_cleanup
@@ -71,6 +71,7 @@
static inline bool wsrep_is_bf_aborted(THD* thd) { return false; }
#endif /* WITH_WSREP */
#include "opt_trace.h"
+#include <mysql/psi/mysql_transaction.h>
#ifdef HAVE_SYS_SYSCALL_H
#include <sys/syscall.h>
@@ -128,6 +129,39 @@ bool Key_part_spec::operator==(const Key_part_spec& other) const
&other.field_name);
}
+
+bool Key_part_spec::check_key_for_blob(const handler *file) const
+{
+ if (!(file->ha_table_flags() & HA_CAN_INDEX_BLOBS))
+ {
+ my_error(ER_BLOB_USED_AS_KEY, MYF(0), field_name.str, file->table_type());
+ return true;
+ }
+ return false;
+}
+
+
+bool Key_part_spec::check_key_length_for_blob() const
+{
+ if (!length)
+ {
+ my_error(ER_BLOB_KEY_WITHOUT_LENGTH, MYF(0), field_name.str);
+ return true;
+ }
+ return false;
+}
+
+
+bool Key_part_spec::init_multiple_key_for_blob(const handler *file)
+{
+ if (check_key_for_blob(file))
+ return true;
+ if (!length)
+ length= file->max_key_length() + 1;
+ return false;
+}
+
+
/**
Construct an (almost) deep copy of this key. Only those
elements that are known to never change are not copied.
@@ -141,7 +175,8 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root)
columns(rhs.columns, mem_root),
name(rhs.name),
option_list(rhs.option_list),
- generated(rhs.generated), invisible(false)
+ generated(rhs.generated), invisible(false),
+ without_overlaps(rhs.without_overlaps), period(rhs.period)
{
list_copy_and_replace_each_value(columns, mem_root);
}
@@ -155,6 +190,7 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root)
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
:Key(rhs,mem_root),
+ constraint_name(rhs.constraint_name),
ref_db(rhs.ref_db),
ref_table(rhs.ref_table),
ref_columns(rhs.ref_columns,mem_root),
@@ -300,6 +336,12 @@ THD *thd_get_current_thd()
}
+extern "C" unsigned long long thd_query_id(const MYSQL_THD thd)
+{
+ return((unsigned long long)thd->query_id);
+}
+
+
/**
Get thread attributes for connection threads
@@ -408,12 +450,6 @@ void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage,
}
extern "C"
-void **thd_ha_data(const THD *thd, const struct handlerton *hton)
-{
- return (void **) &thd->ha_data[hton->slot].ha_ptr;
-}
-
-extern "C"
void thd_storage_lock_wait(THD *thd, long long value)
{
thd->utime_after_lock+= value;
@@ -425,7 +461,7 @@ void thd_storage_lock_wait(THD *thd, long long value)
extern "C"
void *thd_get_ha_data(const THD *thd, const struct handlerton *hton)
{
- return *thd_ha_data(thd, hton);
+ return thd->ha_data[hton->slot].ha_ptr;
}
@@ -438,7 +474,9 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton,
const void *ha_data)
{
plugin_ref *lock= &thd->ha_data[hton->slot].lock;
- DBUG_ASSERT(thd == current_thd);
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->ha_data[hton->slot].ha_ptr= const_cast<void*>(ha_data);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
if (ha_data && !*lock)
*lock= ha_lock_engine(NULL, (handlerton*) hton);
else if (!ha_data && *lock)
@@ -446,9 +484,6 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton,
plugin_unlock(NULL, *lock);
*lock= NULL;
}
- mysql_mutex_lock(&thd->LOCK_thd_data);
- *thd_ha_data(thd, hton)= (void*) ha_data;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
}
@@ -595,19 +630,20 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0), rgi_fake(0), rgi_slave(NULL),
- protocol_text(this), protocol_binary(this),
- m_current_stage_key(0),
+ protocol_text(this), protocol_binary(this), initial_status_var(0),
+ m_current_stage_key(0), m_psi(0),
in_sub_stmt(0), log_all_errors(0),
binlog_unsafe_warning_flags(0),
current_stmt_binlog_format(BINLOG_FORMAT_MIXED),
- binlog_table_maps(0),
bulk_param(0),
table_map_for_update(0),
m_examined_row_count(0),
accessed_rows_and_keys(0),
m_digest(NULL),
m_statement_psi(NULL),
+ m_transaction_psi(NULL),
m_idle_psi(NULL),
+ col_access(NO_ACL),
thread_id(id),
thread_dbug_id(id),
os_thread_id(0),
@@ -661,9 +697,10 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
wsrep_apply_format(0),
wsrep_rbr_buf(NULL),
wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED),
+ wsrep_last_written_gtid_seqno(0),
+ wsrep_current_gtid_seqno(0),
wsrep_affected_rows(0),
wsrep_has_ignored_error(false),
- wsrep_replicate_GTID(false),
wsrep_ignore_table(false),
wsrep_aborter(0),
@@ -706,8 +743,9 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
the destructor works OK in case of an error. The main_mem_root
will be re-initialized in init_for_queries().
*/
- init_sql_alloc(&main_mem_root, "THD::main_mem_root",
- ALLOC_ROOT_MIN_BLOCK_SIZE, 0, MYF(MY_THREAD_SPECIFIC));
+ init_sql_alloc(key_memory_thd_main_mem_root,
+ &main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0,
+ MYF(MY_THREAD_SPECIFIC));
/*
Allocation of user variables for binary logging is always done with main
@@ -719,9 +757,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
thread_stack= 0;
scheduler= thread_scheduler; // Will be fixed later
event_scheduler.data= 0;
- event_scheduler.m_psi= 0;
skip_wait_timeout= false;
- extra_port= 0;
catalog= (char*)"std"; // the only catalog we have for now
main_security_ctx.init();
security_ctx= &main_security_ctx;
@@ -731,7 +767,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
count_cuted_fields= CHECK_FIELD_IGNORE;
killed= NOT_KILLED;
killed_err= 0;
- col_access=0;
is_slave_error= thread_specific_used= FALSE;
my_hash_clear(&handler_tables_hash);
my_hash_clear(&ull_hash);
@@ -761,6 +796,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
bzero((void*) ha_data, sizeof(ha_data));
mysys_var=0;
binlog_evt_union.do_union= FALSE;
+ binlog_table_maps= FALSE;
enable_slow_log= 0;
durability_property= HA_REGULAR_DURABILITY;
@@ -775,12 +811,14 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
system_thread= NON_SYSTEM_THREAD;
cleanup_done= free_connection_done= abort_on_warning= got_warning= 0;
peer_port= 0; // For SHOW PROCESSLIST
- transaction.m_pending_rows_event= 0;
- transaction.on= 1;
- wt_thd_lazy_init(&transaction.wt, &variables.wt_deadlock_search_depth_short,
- &variables.wt_timeout_short,
- &variables.wt_deadlock_search_depth_long,
- &variables.wt_timeout_long);
+ transaction= &default_transaction;
+ transaction->m_pending_rows_event= 0;
+ transaction->on= 1;
+ wt_thd_lazy_init(&transaction->wt,
+ &variables.wt_deadlock_search_depth_short,
+ &variables.wt_timeout_short,
+ &variables.wt_deadlock_search_depth_long,
+ &variables.wt_timeout_long);
#ifdef SIGNAL_WITH_VIO_CLOSE
active_vio = 0;
#endif
@@ -805,16 +843,18 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
reset_open_tables_state(this);
init();
+ debug_sync_init_thread(this);
#if defined(ENABLED_PROFILING)
profiling.set_thd(this);
#endif
user_connect=(USER_CONN *)0;
- my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
- (my_hash_get_key) get_var_key,
+ my_hash_init(key_memory_user_var_entry, &user_vars, system_charset_info,
+ USER_VARS_HASH_SIZE, 0, 0, (my_hash_get_key) get_var_key,
(my_hash_free_key) free_user_var, HASH_THREAD_SPECIFIC);
- my_hash_init(&sequences, system_charset_info, SEQUENCES_HASH_SIZE, 0, 0,
- (my_hash_get_key) get_sequence_last_key,
- (my_hash_free_key) free_sequence_last, HASH_THREAD_SPECIFIC);
+ my_hash_init(PSI_INSTRUMENT_ME, &sequences, system_charset_info,
+ SEQUENCES_HASH_SIZE, 0, 0, (my_hash_get_key)
+ get_sequence_last_key, (my_hash_free_key) free_sequence_last,
+ HASH_THREAD_SPECIFIC);
sp_proc_cache= NULL;
sp_func_cache= NULL;
@@ -823,7 +863,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
/* For user vars replication*/
if (opt_bin_log)
- my_init_dynamic_array(&user_var_events,
+ my_init_dynamic_array(key_memory_user_var_entry, &user_var_events,
sizeof(BINLOG_USER_VAR_EVENT *), 16, 16, MYF(0));
else
bzero((char*) &user_var_events, sizeof(user_var_events));
@@ -851,7 +891,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
m_token_array= NULL;
if (max_digest_length > 0)
{
- m_token_array= (unsigned char*) my_malloc(max_digest_length,
+ m_token_array= (unsigned char*) my_malloc(PSI_INSTRUMENT_ME,
+ max_digest_length,
MYF(MY_WME|MY_THREAD_SPECIFIC));
}
@@ -1151,19 +1192,9 @@ void *thd_memdup(MYSQL_THD thd, const void* str, size_t size)
extern "C"
void thd_get_xid(const MYSQL_THD thd, MYSQL_XID *xid)
{
-#ifdef WITH_WSREP
- if (!thd->wsrep_xid.is_null())
- {
- *xid = *(MYSQL_XID *) &thd->wsrep_xid;
- return;
- }
-#endif /* WITH_WSREP */
- *xid= thd->transaction.xid_state.is_explicit_XA() ?
- *(MYSQL_XID *) thd->transaction.xid_state.get_xid() :
- *(MYSQL_XID *) &thd->transaction.implicit_xid;
+ *xid = *(MYSQL_XID *) thd->get_xid();
}
-
extern "C"
my_time_t thd_TIME_to_gmt_sec(MYSQL_THD thd, const MYSQL_TIME *ltime,
unsigned int *errcode)
@@ -1184,11 +1215,6 @@ void thd_gmt_sec_to_TIME(MYSQL_THD thd, MYSQL_TIME *ltime, my_time_t t)
#ifdef _WIN32
-extern "C" THD *_current_thd_noinline(void)
-{
- return my_pthread_getspecific_ptr(THD*,THR_THD);
-}
-
extern "C" my_thread_id next_thread_id_noinline()
{
#undef next_thread_id
@@ -1235,10 +1261,10 @@ void THD::init()
if (variables.sql_mode & MODE_ANSI_QUOTES)
server_status|= SERVER_STATUS_ANSI_QUOTES;
- transaction.all.modified_non_trans_table=
- transaction.stmt.modified_non_trans_table= FALSE;
- transaction.all.m_unsafe_rollback_flags=
- transaction.stmt.m_unsafe_rollback_flags= 0;
+ transaction->all.modified_non_trans_table=
+ transaction->stmt.modified_non_trans_table= FALSE;
+ transaction->all.m_unsafe_rollback_flags=
+ transaction->stmt.m_unsafe_rollback_flags= 0;
open_options=ha_open_options;
update_lock_default= (variables.low_priority_updates ?
@@ -1279,7 +1305,6 @@ void THD::init()
wsrep_rbr_buf = NULL;
wsrep_affected_rows = 0;
m_wsrep_next_trx_id = WSREP_UNDEFINED_TRX_ID;
- wsrep_replicate_GTID = false;
wsrep_aborter = 0;
wsrep_desynced_backup_stage= false;
#endif /* WITH_WSREP */
@@ -1289,22 +1314,16 @@ void THD::init()
else
variables.option_bits&= ~OPTION_BIN_LOG;
- variables.sql_log_bin_off= 0;
-
select_commands= update_commands= other_commands= 0;
/* Set to handle counting of aborted connections */
userstat_running= opt_userstat_running;
last_global_update_time= current_connect_time= time(NULL);
-#if defined(ENABLED_DEBUG_SYNC)
- /* Initialize the Debug Sync Facility. See debug_sync.cc. */
- debug_sync_init_thread(this);
-#endif /* defined(ENABLED_DEBUG_SYNC) */
-
#ifndef EMBEDDED_LIBRARY
session_tracker.enable(this);
#endif //EMBEDDED_LIBRARY
apc_target.init(&LOCK_thd_kill);
+ gap_tracker_data.init();
DBUG_VOID_RETURN;
}
@@ -1374,21 +1393,18 @@ void THD::update_all_stats()
void THD::init_for_queries()
{
- set_time();
- /*
- We don't need to call ha_enable_transaction() as we can't have
- any active transactions that has to be committed
- */
- DBUG_ASSERT(transaction.is_empty());
- transaction.on= TRUE;
+ DBUG_ASSERT(transaction->on);
+ DBUG_ASSERT(m_transaction_psi == NULL);
+ /* Set time for --init-file queries */
+ set_time();
reset_root_defaults(mem_root, variables.query_alloc_block_size,
variables.query_prealloc_size);
- reset_root_defaults(&transaction.mem_root,
+ reset_root_defaults(&transaction->mem_root,
variables.trans_alloc_block_size,
variables.trans_prealloc_size);
- DBUG_ASSERT(!transaction.xid_state.is_explicit_XA());
- DBUG_ASSERT(transaction.implicit_xid.is_null());
+ DBUG_ASSERT(!transaction->xid_state.is_explicit_XA());
+ DBUG_ASSERT(transaction->implicit_xid.is_null());
}
@@ -1423,12 +1439,13 @@ void THD::change_user(void)
init();
stmt_map.reset();
- my_hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
- (my_hash_get_key) get_var_key,
- (my_hash_free_key) free_user_var, 0);
- my_hash_init(&sequences, system_charset_info, SEQUENCES_HASH_SIZE, 0, 0,
- (my_hash_get_key) get_sequence_last_key,
- (my_hash_free_key) free_sequence_last, HASH_THREAD_SPECIFIC);
+ my_hash_init(key_memory_user_var_entry, &user_vars, system_charset_info,
+ USER_VARS_HASH_SIZE, 0, 0, (my_hash_get_key) get_var_key,
+ (my_hash_free_key) free_user_var, HASH_THREAD_SPECIFIC);
+ my_hash_init(key_memory_user_var_entry, &sequences, system_charset_info,
+ SEQUENCES_HASH_SIZE, 0, 0, (my_hash_get_key)
+ get_sequence_last_key, (my_hash_free_key) free_sequence_last,
+ HASH_THREAD_SPECIFIC);
sp_cache_clear(&sp_proc_cache);
sp_cache_clear(&sp_func_cache);
sp_cache_clear(&sp_package_spec_cache);
@@ -1465,7 +1482,8 @@ bool THD::set_db(const LEX_CSTRING *new_db)
const char *tmp= NULL;
if (new_db->str)
{
- if (!(tmp= my_strndup(new_db->str, new_db->length, MYF(MY_WME | ME_FATAL))))
+ if (!(tmp= my_strndup(key_memory_THD_db, new_db->str, new_db->length,
+ MYF(MY_WME | ME_FATAL))))
result= 1;
}
@@ -1526,12 +1544,14 @@ void THD::cleanup(void)
delete_dynamic(&user_var_events);
close_temporary_tables();
- if (transaction.xid_state.is_explicit_XA())
+ if (transaction->xid_state.is_explicit_XA())
trans_xa_detach(this);
else
trans_rollback(this);
DBUG_ASSERT(open_tables == NULL);
+ DBUG_ASSERT(m_transaction_psi == NULL);
+
/*
If the thread was in the middle of an ongoing transaction (rolled
back a few lines above) or under LOCK TABLES (unlocked the tables
@@ -1552,12 +1572,7 @@ void THD::cleanup(void)
decrease_user_connections(user_connect);
user_connect= 0; // Safety
}
- wt_thd_destroy(&transaction.wt);
-
-#if defined(ENABLED_DEBUG_SYNC)
- /* End the Debug Sync Facility. See debug_sync.cc. */
- debug_sync_end_thread(this);
-#endif /* defined(ENABLED_DEBUG_SYNC) */
+ wt_thd_destroy(&transaction->wt);
my_hash_free(&user_vars);
my_hash_free(&sequences);
@@ -1612,6 +1627,7 @@ void THD::free_connection()
#if defined(ENABLED_PROFILING)
profiling.restart(); // Reset profiling
#endif
+ debug_sync_reset_thread(this);
}
/*
@@ -1636,7 +1652,7 @@ void THD::reset_for_reuse()
abort_on_warning= 0;
free_connection_done= 0;
m_command= COM_CONNECT;
- transaction.on= 1;
+ transaction->on= 1;
#if defined(ENABLED_PROFILING)
profiling.reset();
#endif
@@ -1656,6 +1672,8 @@ THD::~THD()
DBUG_ENTER("~THD()");
/* Make sure threads are not available via server_threads. */
assert_not_linked();
+ if (m_psi)
+ PSI_CALL_set_thread_THD(m_psi, 0);
/*
In error cases, thd may not be current thd. We have to fix this so
@@ -1684,7 +1702,7 @@ THD::~THD()
#endif
mdl_context.destroy();
- free_root(&transaction.mem_root,MYF(0));
+ transaction->free();
mysql_cond_destroy(&COND_wakeup_ready);
mysql_mutex_destroy(&LOCK_wakeup_ready);
mysql_mutex_destroy(&LOCK_thd_data);
@@ -1716,12 +1734,16 @@ THD::~THD()
lf_hash_put_pins(tdc_hash_pins);
if (xid_hash_pins)
lf_hash_put_pins(xid_hash_pins);
+ debug_sync_end_thread(this);
/* Ensure everything is freed */
status_var.local_memory_used-= sizeof(THD);
/* trick to make happy memory accounting system */
#ifndef EMBEDDED_LIBRARY
session_tracker.sysvars.deinit();
+#ifdef USER_VAR_TRACKING
+ session_tracker.user_variables.deinit();
+#endif // USER_VAR_TRACKING
#endif //EMBEDDED_LIBRARY
if (status_var.local_memory_used != 0)
@@ -2136,7 +2158,7 @@ void THD::reset_killed()
the structure for the net buffer
*/
-bool THD::store_globals()
+void THD::store_globals()
{
/*
Assert that thread_stack is initialized: it's necessary to be able
@@ -2144,8 +2166,7 @@ bool THD::store_globals()
*/
DBUG_ASSERT(thread_stack);
- if (set_current_thd(this))
- return 1;
+ set_current_thd(this);
/*
mysys_var is concurrently readable by a killer thread.
It is protected by LOCK_thd_kill, it is not needed to lock while the
@@ -2187,8 +2208,6 @@ bool THD::store_globals()
created in another thread
*/
thr_lock_info_init(&lock_info, mysys_var);
-
- return 0;
}
/**
@@ -2480,7 +2499,8 @@ bool THD::to_ident_sys_alloc(Lex_ident_sys_st *to, const Lex_ident_cli_st *ident
Item_basic_constant *
-THD::make_string_literal(const char *str, size_t length, uint repertoire)
+THD::make_string_literal(const char *str, size_t length,
+ my_repertoire_t repertoire)
{
if (!length && (variables.sql_mode & MODE_EMPTY_STRING_IS_NULL))
return new (mem_root) Item_null(this, 0, variables.collation_connection);
@@ -2521,8 +2541,7 @@ THD::make_string_literal_charset(const Lex_string_with_metadata_st &str,
{
if (!str.length && (variables.sql_mode & MODE_EMPTY_STRING_IS_NULL))
return new (mem_root) Item_null(this, 0, cs);
- return new (mem_root) Item_string_with_introducer(this,
- str.str, (uint)str.length, cs);
+ return new (mem_root) Item_string_with_introducer(this, str, cs);
}
@@ -2580,7 +2599,8 @@ void THD::add_changed_table(TABLE *table)
{
DBUG_ENTER("THD::add_changed_table(table)");
- DBUG_ASSERT(in_multi_stmt_transaction_mode() && table->file->has_transactions());
+ DBUG_ASSERT(in_multi_stmt_transaction_mode() &&
+ table->file->has_transactions());
add_changed_table(table->s->table_cache_key.str,
(long) table->s->table_cache_key.length);
DBUG_VOID_RETURN;
@@ -2590,8 +2610,8 @@ void THD::add_changed_table(TABLE *table)
void THD::add_changed_table(const char *key, size_t key_length)
{
DBUG_ENTER("THD::add_changed_table(key)");
- CHANGED_TABLE_LIST **prev_changed = &transaction.changed_tables;
- CHANGED_TABLE_LIST *curr = transaction.changed_tables;
+ CHANGED_TABLE_LIST **prev_changed = &transaction->changed_tables;
+ CHANGED_TABLE_LIST *curr = transaction->changed_tables;
for (; curr; prev_changed = &(curr->next), curr = curr->next)
{
@@ -3043,15 +3063,6 @@ int select_send::send_data(List<Item> &items)
Protocol *protocol= thd->protocol;
DBUG_ENTER("select_send::send_data");
- /* unit is not set when using 'delete ... returning' */
- if (unit && unit->offset_limit_cnt)
- { // using limit offset,count
- unit->offset_limit_cnt--;
- DBUG_RETURN(FALSE);
- }
- if (thd->killed == ABORT_QUERY)
- DBUG_RETURN(FALSE);
-
protocol->prepare_for_resend();
if (protocol->send_result_set_row(&items))
{
@@ -3314,13 +3325,6 @@ int select_export::send_data(List<Item> &items)
String tmp(buff,sizeof(buff),&my_charset_bin),*res;
tmp.length(0);
- if (unit->offset_limit_cnt)
- { // using limit offset,count
- unit->offset_limit_cnt--;
- DBUG_RETURN(0);
- }
- if (thd->killed == ABORT_QUERY)
- DBUG_RETURN(0);
row_count++;
Item *item;
uint used_length=0,items_left=items.elements;
@@ -3438,7 +3442,7 @@ int select_export::send_data(List<Item> &items)
pos++)
{
#ifdef USE_MB
- if (use_mb(res_charset))
+ if (res_charset->use_mb())
{
int l;
if ((l=my_ismbchar(res_charset, pos, end)))
@@ -3574,14 +3578,6 @@ int select_dump::send_data(List<Item> &items)
Item *item;
DBUG_ENTER("select_dump::send_data");
- if (unit->offset_limit_cnt)
- { // using limit offset,count
- unit->offset_limit_cnt--;
- DBUG_RETURN(0);
- }
- if (thd->killed == ABORT_QUERY)
- DBUG_RETURN(0);
-
if (row_count++ > 1)
{
my_message(ER_TOO_MANY_ROWS, ER_THD(thd, ER_TOO_MANY_ROWS), MYF(0));
@@ -3617,13 +3613,6 @@ int select_singlerow_subselect::send_data(List<Item> &items)
MYF(current_thd->lex->ignore ? ME_WARNING : 0));
DBUG_RETURN(1);
}
- if (unit->offset_limit_cnt)
- { // Using limit offset,count
- unit->offset_limit_cnt--;
- DBUG_RETURN(0);
- }
- if (thd->killed == ABORT_QUERY)
- DBUG_RETURN(0);
List_iterator_fast<Item> li(items);
Item *val_item;
for (uint i= 0; (val_item= li++); i++)
@@ -3674,7 +3663,7 @@ int select_max_min_finder_subselect::send_data(List<Item> &items)
break;
case ROW_RESULT:
case TIME_RESULT:
- // This case should never be choosen
+ // This case should never be chosen
DBUG_ASSERT(0);
op= 0;
}
@@ -3758,13 +3747,6 @@ int select_exists_subselect::send_data(List<Item> &items)
{
DBUG_ENTER("select_exists_subselect::send_data");
Item_exists_subselect *it= (Item_exists_subselect *)item;
- if (unit->offset_limit_cnt)
- { // Using limit offset,count
- unit->offset_limit_cnt--;
- DBUG_RETURN(0);
- }
- if (thd->killed == ABORT_QUERY)
- DBUG_RETURN(0);
it->value= 1;
it->assigned(1);
DBUG_RETURN(0);
@@ -3993,12 +3975,12 @@ Statement_map::Statement_map() :
START_STMT_HASH_SIZE = 16,
START_NAME_HASH_SIZE = 16
};
- my_hash_init(&st_hash, &my_charset_bin, START_STMT_HASH_SIZE, 0, 0,
- get_statement_id_as_hash_key,
+ my_hash_init(key_memory_prepared_statement_map, &st_hash, &my_charset_bin,
+ START_STMT_HASH_SIZE, 0, 0, get_statement_id_as_hash_key,
delete_statement_as_hash_key, MYF(0));
- my_hash_init(&names_hash, system_charset_info, START_NAME_HASH_SIZE, 0, 0,
+ my_hash_init(key_memory_prepared_statement_map, &names_hash, system_charset_info, START_NAME_HASH_SIZE, 0, 0,
(my_hash_get_key) get_stmt_name_hash_key,
- NULL,MYF(0));
+ NULL, MYF(0));
}
@@ -4166,12 +4148,7 @@ int select_dumpvar::send_data(List<Item> &items)
{
DBUG_ENTER("select_dumpvar::send_data");
- if (unit->offset_limit_cnt)
- { // using limit offset,count
- unit->offset_limit_cnt--;
- DBUG_RETURN(0);
- }
- if (row_count++)
+ if (row_count++)
{
my_message(ER_TOO_MANY_ROWS, ER_THD(thd, ER_TOO_MANY_ROWS), MYF(0));
DBUG_RETURN(1);
@@ -4361,10 +4338,10 @@ void Security_context::init()
host= user= ip= external_user= 0;
host_or_ip= "connecting host";
priv_user[0]= priv_host[0]= proxy_user[0]= priv_role[0]= '\0';
- master_access= 0;
+ master_access= NO_ACL;
password_expired= false;
#ifndef NO_EMBEDDED_ACCESS_CHECKS
- db_access= NO_ACCESS;
+ db_access= NO_ACL;
#endif
}
@@ -4399,7 +4376,7 @@ void Security_context::skip_grants()
{
/* privileges for the user are unknown everything is allowed */
host_or_ip= (char *)"";
- master_access= ~NO_ACCESS;
+ master_access= ALL_KNOWN_ACL;
*priv_user= *priv_host= '\0';
password_expired= false;
}
@@ -4407,15 +4384,16 @@ void Security_context::skip_grants()
bool Security_context::set_user(char *user_arg)
{
- my_free((char*) user);
- user= my_strdup(user_arg, MYF(0));
+ my_free(const_cast<char*>(user));
+ user= my_strdup(key_memory_MPVIO_EXT_auth_info, user_arg, MYF(0));
return user == 0;
}
-bool Security_context::check_access(ulong want_access, bool match_any)
+bool Security_context::check_access(const privilege_t want_access,
+ bool match_any)
{
DBUG_ENTER("Security_context::check_access");
- DBUG_RETURN((match_any ? (master_access & want_access)
+ DBUG_RETURN((match_any ? (master_access & want_access) != NO_ACL
: ((master_access & want_access) == want_access)));
}
@@ -4682,7 +4660,13 @@ extern "C" void thd_progress_report(MYSQL_THD thd,
return;
if (thd->progress.max_counter != max_progress) // Simple optimization
{
- mysql_mutex_lock(&thd->LOCK_thd_data);
+ /*
+ Better to not wait in the unlikely event that LOCK_thd_data is locked
+ as Galera can potentially have this locked for a long time.
+ Progress counters will fix themselves after the next call.
+ */
+ if (mysql_mutex_trylock(&thd->LOCK_thd_data))
+ return;
thd->progress.counter= progress;
thd->progress.max_counter= max_progress;
mysql_mutex_unlock(&thd->LOCK_thd_data);
@@ -4783,6 +4767,32 @@ extern "C" void thd_create_random_password(MYSQL_THD thd,
}
+extern "C" const char *thd_priv_host(MYSQL_THD thd, size_t *length)
+{
+ const Security_context *sctx= thd->security_ctx;
+ if (!sctx)
+ {
+ *length= 0;
+ return NULL;
+ }
+ *length= strlen(sctx->priv_host);
+ return sctx->priv_host;
+}
+
+
+extern "C" const char *thd_priv_user(MYSQL_THD thd, size_t *length)
+{
+ const Security_context *sctx= thd->security_ctx;
+ if (!sctx)
+ {
+ *length= 0;
+ return NULL;
+ }
+ *length= strlen(sctx->priv_user);
+ return sctx->priv_user;
+}
+
+
#ifdef INNODB_COMPATIBILITY_HOOKS
/** open a table and add it to thd->open_tables
@@ -4802,7 +4812,8 @@ TABLE *open_purge_table(THD *thd, const char *db, size_t dblen,
DBUG_ASSERT(thd->open_tables == NULL);
DBUG_ASSERT(thd->locked_tables_mode < LTM_PRELOCKED);
- Open_table_context ot_ctx(thd, MYSQL_OPEN_IGNORE_FLUSH);
+ /* Purge already hold the MDL for the table */
+ Open_table_context ot_ctx(thd, MYSQL_OPEN_HAS_MDL_LOCK);
TABLE_LIST *tl= (TABLE_LIST*)thd->alloc(sizeof(TABLE_LIST));
LEX_CSTRING db_name= {db, dblen };
LEX_CSTRING table_name= { tb, tblen };
@@ -4821,6 +4832,12 @@ TABLE *open_purge_table(THD *thd, const char *db, size_t dblen,
DBUG_RETURN(error ? NULL : tl->table);
}
+TABLE *get_purge_table(THD *thd)
+{
+ /* see above, at most one table can be opened */
+ DBUG_ASSERT(thd->open_tables == NULL || thd->open_tables->next == NULL);
+ return thd->open_tables;
+}
/** Find an open table in the list of prelocked tabled
@@ -4864,6 +4881,121 @@ void destroy_thd(MYSQL_THD thd)
delete thd;
}
+/**
+ Create a THD that only has auxilliary functions
+ It will never be added to the global connection list
+ server_threads. It does not represent any client connection.
+
+ It should never be counted, because it will stall the
+ shutdown. It is solely for engine's internal use,
+ like for example, evaluation of virtual function in innodb
+ purge.
+*/
+extern "C" pthread_key(struct st_my_thread_var *, THR_KEY_mysys);
+MYSQL_THD create_background_thd()
+{
+ auto save_thd = current_thd;
+ set_current_thd(nullptr);
+
+ auto save_mysysvar= pthread_getspecific(THR_KEY_mysys);
+
+ /*
+ Allocate new mysys_var specifically new THD,
+ so that e.g safemalloc, DBUG etc are happy.
+ */
+ pthread_setspecific(THR_KEY_mysys, 0);
+ my_thread_init();
+ auto thd_mysysvar= pthread_getspecific(THR_KEY_mysys);
+ auto thd= new THD(0);
+ pthread_setspecific(THR_KEY_mysys, save_mysysvar);
+ thd->set_psi(nullptr);
+ set_current_thd(save_thd);
+
+ /*
+ Workaround the adverse effect of incrementing thread_count
+ in THD constructor. We do not want these THDs to be counted,
+ or waited for on shutdown.
+ */
+ THD_count::count--;
+
+ thd->mysys_var= (st_my_thread_var *) thd_mysysvar;
+ thd->set_command(COM_DAEMON);
+ thd->system_thread= SYSTEM_THREAD_GENERIC;
+ thd->security_ctx->host_or_ip= "";
+ thd->real_id= 0;
+ thd->thread_id= 0;
+ thd->query_id= 0;
+ return thd;
+}
+
+
+/*
+ Attach a background THD.
+
+ Changes current value THR_KEY_mysys TLS variable,
+ and returns the original value.
+*/
+void *thd_attach_thd(MYSQL_THD thd)
+{
+ DBUG_ASSERT(!current_thd);
+ DBUG_ASSERT(thd && thd->mysys_var);
+
+ auto save_mysysvar= pthread_getspecific(THR_KEY_mysys);
+ pthread_setspecific(THR_KEY_mysys, thd->mysys_var);
+ thd->thread_stack= (char *) &thd;
+ thd->store_globals();
+ return save_mysysvar;
+}
+
+/*
+ Restore THR_KEY_mysys TLS variable,
+ which was changed thd_attach_thd().
+*/
+void thd_detach_thd(void *mysysvar)
+{
+ /* Restore mysys_var that is changed when THD was attached.*/
+ pthread_setspecific(THR_KEY_mysys, mysysvar);
+ /* Restore the THD (we assume it was NULL during attach).*/
+ set_current_thd(0);
+}
+
+/*
+ Destroy a THD that was previously created by
+ create_background_thd()
+*/
+void destroy_background_thd(MYSQL_THD thd)
+{
+ DBUG_ASSERT(!current_thd);
+ auto thd_mysys_var= thd->mysys_var;
+ auto save_mysys_var= thd_attach_thd(thd);
+ DBUG_ASSERT(thd_mysys_var != save_mysys_var);
+ /*
+ Workaround the adverse effect decrementing thread_count on THD()
+ destructor.
+ As we decremented it in create_background_thd(), in order for it
+ not to go negative, we have to increment it before destructor.
+ */
+ THD_count::count++;
+ delete thd;
+
+ thd_detach_thd(save_mysys_var);
+ /*
+ Delete THD-specific my_thread_var, that was
+ allocated in create_background_thd().
+ Also preserve current PSI context, since my_thread_end()
+ would kill it, if we're not careful.
+ */
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ auto save_psi_thread= PSI_CALL_get_thread();
+#endif
+ PSI_CALL_set_thread(0);
+ pthread_setspecific(THR_KEY_mysys, thd_mysys_var);
+ my_thread_end();
+ pthread_setspecific(THR_KEY_mysys, save_mysys_var);
+ PSI_CALL_set_thread(save_psi_thread);
+}
+
+
void reset_thd(MYSQL_THD thd)
{
close_thread_tables(thd);
@@ -4939,6 +5071,55 @@ extern "C" size_t thd_query_safe(MYSQL_THD thd, char *buf, size_t buflen)
}
+extern "C" const char *thd_user_name(MYSQL_THD thd)
+{
+ if (!thd->security_ctx)
+ return 0;
+
+ return thd->security_ctx->user;
+}
+
+
+extern "C" const char *thd_client_host(MYSQL_THD thd)
+{
+ if (!thd->security_ctx)
+ return 0;
+
+ return thd->security_ctx->host;
+}
+
+
+extern "C" const char *thd_client_ip(MYSQL_THD thd)
+{
+ if (!thd->security_ctx)
+ return 0;
+
+ return thd->security_ctx->ip;
+}
+
+
+extern "C" LEX_CSTRING *thd_current_db(MYSQL_THD thd)
+{
+ return &thd->db;
+}
+
+
+extern "C" int thd_current_status(MYSQL_THD thd)
+{
+ Diagnostics_area *da= thd->get_stmt_da();
+ if (!da)
+ return 0;
+
+ return da->is_error() ? da->sql_errno() : 0;
+}
+
+
+extern "C" enum enum_server_command thd_current_command(MYSQL_THD thd)
+{
+ return thd->get_command();
+}
+
+
extern "C" int thd_slave_thread(const MYSQL_THD thd)
{
return(thd->slave_thread);
@@ -5026,7 +5207,7 @@ thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd)
if (!thd)
return 0;
DEBUG_SYNC(thd, "thd_report_wait_for");
- thd->transaction.stmt.mark_trans_did_wait();
+ thd->transaction->stmt.mark_trans_did_wait();
if (!other_thd)
return 0;
binlog_report_wait_for(thd, other_thd);
@@ -5141,7 +5322,7 @@ thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
{
- return(thd->transaction.all.modified_non_trans_table);
+ return(thd->transaction->all.modified_non_trans_table);
}
extern "C" int thd_binlog_format(const MYSQL_THD thd)
@@ -5287,6 +5468,18 @@ extern "C" void thd_wait_end(MYSQL_THD thd)
#endif // INNODB_COMPATIBILITY_HOOKS */
+
+/**
+ MDL_context accessor
+ @param thd the current session
+ @return pointer to thd->mdl_context
+*/
+extern "C" void *thd_mdl_context(MYSQL_THD thd)
+{
+ return &thd->mdl_context;
+}
+
+
/****************************************************************************
Handling of statement states in functions and triggers.
@@ -5338,7 +5531,7 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup,
backup->limit_found_rows= limit_found_rows;
backup->cuted_fields= cuted_fields;
backup->client_capabilities= client_capabilities;
- backup->savepoints= transaction.savepoints;
+ backup->savepoints= transaction->savepoints;
backup->first_successful_insert_id_in_prev_stmt=
first_successful_insert_id_in_prev_stmt;
backup->first_successful_insert_id_in_cur_stmt=
@@ -5360,7 +5553,7 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup,
client_capabilities &= ~CLIENT_MULTI_RESULTS;
in_sub_stmt|= new_state;
cuted_fields= 0;
- transaction.savepoints= 0;
+ transaction->savepoints= 0;
first_successful_insert_id_in_cur_stmt= 0;
reset_slow_query_state();
}
@@ -5386,16 +5579,16 @@ void THD::restore_sub_statement_state(Sub_statement_state *backup)
level. It is enough to release first savepoint set on this level since
all later savepoints will be released automatically.
*/
- if (transaction.savepoints)
+ if (transaction->savepoints)
{
SAVEPOINT *sv;
- for (sv= transaction.savepoints; sv->prev; sv= sv->prev)
+ for (sv= transaction->savepoints; sv->prev; sv= sv->prev)
{}
/* ha_release_savepoint() never returns error. */
(void)ha_release_savepoint(this, sv);
}
count_cuted_fields= backup->count_cuted_fields;
- transaction.savepoints= backup->savepoints;
+ transaction->savepoints= backup->savepoints;
variables.option_bits= backup->option_bits;
in_sub_stmt= backup->in_sub_stmt;
enable_slow_log= backup->enable_slow_log;
@@ -5717,6 +5910,96 @@ void THD::mark_transaction_to_rollback(bool all)
/**
+ Commit the whole transaction (both statment and all)
+
+ This is used mainly to commit an independent transaction,
+ like reading system tables.
+
+ @return 0 0k
+ @return <>0 error code. my_error() has been called()
+*/
+
+int THD::commit_whole_transaction_and_close_tables()
+{
+ int error, error2;
+ DBUG_ENTER("THD::commit_whole_transaction_and_close_tables");
+
+ /*
+ This can only happened if we failed to open any table in the
+ new transaction
+ */
+ DBUG_ASSERT(open_tables);
+
+ if (!open_tables) // Safety for production usage
+ DBUG_RETURN(0);
+
+ /*
+ Ensure table was locked (opened with open_and_lock_tables()). If not
+ the THD can't be part of any transactions and doesn't have to call
+ this function.
+ */
+ DBUG_ASSERT(lock);
+
+ error= ha_commit_trans(this, FALSE);
+ /* This will call external_lock to unlock all tables */
+ if ((error2= mysql_unlock_tables(this, lock)))
+ {
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), error2);
+ error= error2;
+ }
+ lock= 0;
+ if ((error2= ha_commit_trans(this, TRUE)))
+ error= error2;
+ close_thread_tables(this);
+ DBUG_RETURN(error);
+}
+
+/**
+ Start a new independent transaction
+*/
+
+start_new_trans::start_new_trans(THD *thd)
+{
+ org_thd= thd;
+ mdl_savepoint= thd->mdl_context.mdl_savepoint();
+ memcpy(old_ha_data, thd->ha_data, sizeof(old_ha_data));
+ thd->reset_n_backup_open_tables_state(&open_tables_state_backup);
+ for (auto &data : thd->ha_data)
+ data.reset();
+ old_transaction= thd->transaction;
+ thd->transaction= &new_transaction;
+ new_transaction.on= 1;
+ in_sub_stmt= thd->in_sub_stmt;
+ thd->in_sub_stmt= 0;
+ server_status= thd->server_status;
+ m_transaction_psi= thd->m_transaction_psi;
+ thd->m_transaction_psi= 0;
+ wsrep_on= thd->variables.wsrep_on;
+ thd->variables.wsrep_on= 0;
+ thd->server_status&= ~(SERVER_STATUS_IN_TRANS |
+ SERVER_STATUS_IN_TRANS_READONLY);
+ thd->server_status|= SERVER_STATUS_AUTOCOMMIT;
+}
+
+
+void start_new_trans::restore_old_transaction()
+{
+ org_thd->transaction= old_transaction;
+ org_thd->restore_backup_open_tables_state(&open_tables_state_backup);
+ ha_close_connection(org_thd);
+ memcpy(org_thd->ha_data, old_ha_data, sizeof(old_ha_data));
+ org_thd->mdl_context.rollback_to_savepoint(mdl_savepoint);
+ org_thd->in_sub_stmt= in_sub_stmt;
+ org_thd->server_status= server_status;
+ if (org_thd->m_transaction_psi)
+ MYSQL_COMMIT_TRANSACTION(org_thd->m_transaction_psi);
+ org_thd->m_transaction_psi= m_transaction_psi;
+ org_thd->variables.wsrep_on= wsrep_on;
+ org_thd= 0;
+}
+
+
+/**
Decide on logging format to use for the statement and issue errors
or warnings as needed. The decision depends on the following
parameters:
@@ -5815,8 +6098,9 @@ int THD::decide_logging_format(TABLE_LIST *tables)
{
DBUG_ENTER("THD::decide_logging_format");
DBUG_PRINT("info", ("Query: %.*s", (uint) query_length(), query()));
- DBUG_PRINT("info", ("variables.binlog_format: %lu",
- variables.binlog_format));
+ DBUG_PRINT("info", ("binlog_format: %lu", (ulong) variables.binlog_format));
+ DBUG_PRINT("info", ("current_stmt_binlog_format: %lu",
+ (ulong) current_stmt_binlog_format));
DBUG_PRINT("info", ("lex->get_stmt_unsafe_flags(): 0x%x",
lex->get_stmt_unsafe_flags()));
@@ -5841,18 +6125,11 @@ int THD::decide_logging_format(TABLE_LIST *tables)
DBUG_RETURN(-1);
}
}
-
- if ((WSREP_EMULATE_BINLOG_NNULL(this) ||
- (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) &&
- !(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
- !binlog_filter->db_ok(db.str)))
-#else
- if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
- !(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
- !binlog_filter->db_ok(db.str)))
#endif /* WITH_WSREP */
- {
+ if (WSREP_EMULATE_BINLOG_NNULL(this) ||
+ binlog_table_should_be_logged(&db))
+ {
if (is_bulk_op())
{
if (wsrep_binlog_format() == BINLOG_FORMAT_STMT)
@@ -5895,6 +6172,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
bool has_auto_increment_write_tables_not_first= FALSE;
bool found_first_not_own_table= FALSE;
bool has_write_tables_with_unsafe_statements= FALSE;
+ bool blackhole_table_found= 0;
/*
A pointer to a previous table that was changed.
@@ -6020,6 +6298,10 @@ int THD::decide_logging_format(TABLE_LIST *tables)
if (prev_write_table && prev_write_table->file->ht !=
table->file->ht)
multi_write_engine= TRUE;
+
+ if (table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB)
+ blackhole_table_found= 1;
+
if (share->non_determinstic_insert &&
!(sql_command_flags[lex->sql_command] & CF_SCHEMA_CHANGE))
has_write_tables_with_unsafe_statements= true;
@@ -6265,7 +6547,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
is_current_stmt_binlog_format_row() ?
"ROW" : "STATEMENT"));
- if (variables.binlog_format == BINLOG_FORMAT_ROW &&
+ if (blackhole_table_found &&
+ variables.binlog_format == BINLOG_FORMAT_ROW &&
(sql_command_flags[lex->sql_command] &
(CF_UPDATES_DATA | CF_DELETES_DATA)))
{
@@ -6281,8 +6564,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB &&
table->lock_type >= TL_WRITE_ALLOW_WRITE)
{
- table_names.append(&table->table_name);
- table_names.append(",");
+ table_names.append(&table->table_name);
+ table_names.append(",");
}
}
if (!table_names.is_empty())
@@ -6302,9 +6585,12 @@ int THD::decide_logging_format(TABLE_LIST *tables)
table_names.c_ptr());
}
}
+
+ if (is_write && is_current_stmt_binlog_format_row())
+ binlog_prepare_for_row_logging();
}
-#ifndef DBUG_OFF
else
+ {
DBUG_PRINT("info", ("decision: no logging since "
"mysql_bin_log.is_open() = %d "
"and (options & OPTION_BIN_LOG) = 0x%llx "
@@ -6314,22 +6600,23 @@ int THD::decide_logging_format(TABLE_LIST *tables)
(variables.option_bits & OPTION_BIN_LOG),
(uint) wsrep_binlog_format(),
binlog_filter->db_ok(db.str)));
-#endif
-
+ if (WSREP_NNULL(this) && is_current_stmt_binlog_format_row())
+ binlog_prepare_for_row_logging();
+ }
DBUG_RETURN(0);
}
int THD::decide_logging_format_low(TABLE *table)
{
+ DBUG_ENTER("decide_logging_format_low");
/*
- INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
- can be unsafe.
- */
- if(wsrep_binlog_format() <= BINLOG_FORMAT_STMT &&
- !is_current_stmt_binlog_format_row() &&
- !lex->is_stmt_unsafe() &&
- lex->sql_command == SQLCOM_INSERT &&
- lex->duplicates == DUP_UPDATE)
+ INSERT...ON DUPLICATE KEY UPDATE on a table with more than one unique keys
+ can be unsafe.
+ */
+ if (wsrep_binlog_format() <= BINLOG_FORMAT_STMT &&
+ !is_current_stmt_binlog_format_row() &&
+ !lex->is_stmt_unsafe() &&
+ lex->duplicates == DUP_UPDATE)
{
uint unique_keys= 0;
uint keys= table->s->keys, i= 0;
@@ -6356,19 +6643,29 @@ exit:;
lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_TWO_KEYS);
binlog_unsafe_warning_flags|= lex->get_stmt_unsafe_flags();
set_current_stmt_binlog_format_row_if_mixed();
- return 1;
+ if (is_current_stmt_binlog_format_row())
+ binlog_prepare_for_row_logging();
+ DBUG_RETURN(1);
}
}
- return 0;
+ DBUG_RETURN(0);
}
-/*
- Implementation of interface to write rows to the binary log through the
- thread. The thread is responsible for writing the rows it has
- inserted/updated/deleted.
+#ifndef MYSQL_CLIENT
+/**
+ Check if we should log a table DDL to the binlog
+
+ @retval true yes
+ @retval false no
*/
-#ifndef MYSQL_CLIENT
+bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db)
+{
+ return (mysql_bin_log.is_open() &&
+ (variables.option_bits & OPTION_BIN_LOG) &&
+ (wsrep_binlog_format() != BINLOG_FORMAT_STMT ||
+ binlog_filter->db_ok(db->str)));
+}
/*
Template member function for ensuring that there is an rows log
@@ -6376,7 +6673,7 @@ exit:;
PRE CONDITION:
- Events of type 'RowEventT' have the type code 'type_code'.
-
+
POST CONDITION:
If a non-NULL pointer is returned, the pending event for thread 'thd' will
be an event of type 'RowEventT' (which have the type code 'type_code')
@@ -6569,7 +6866,8 @@ CPP_UNNAMED_NS_START
}
else
{
- m_memory= (uchar *) my_malloc(total_length, MYF(MY_WME));
+ m_memory= (uchar *) my_malloc(key_memory_Row_data_memory_memory,
+ total_length, MYF(MY_WME));
m_release_memory_on_destruction= TRUE;
}
}
@@ -6788,12 +7086,13 @@ void THD::binlog_prepare_row_images(TABLE *table)
/**
if there is a primary key in the table (ie, user declared PK or a
- non-null unique index) and we dont want to ship the entire image,
+ non-null unique index) and we don't want to ship the entire image,
and the handler involved supports this.
*/
if (table->s->primary_key < MAX_KEY &&
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
- !ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW_OPT))
+ !ha_check_storage_engine_flag(table->s->db_type(),
+ HTON_NO_BINLOG_ROW_OPT))
{
/**
Just to be sure that tmp_set is currently not in use as
@@ -6838,7 +7137,7 @@ void THD::binlog_prepare_row_images(TABLE *table)
-int THD::binlog_remove_pending_rows_event(bool clear_maps,
+int THD::binlog_remove_pending_rows_event(bool reset_stmt,
bool is_transactional)
{
DBUG_ENTER("THD::binlog_remove_pending_rows_event");
@@ -6852,12 +7151,12 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps,
mysql_bin_log.remove_pending_rows_event(this, is_transactional);
- if (clear_maps)
- binlog_table_maps= 0;
-
+ if (reset_stmt)
+ reset_binlog_for_next_statement();
DBUG_RETURN(0);
}
+
int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
{
DBUG_ENTER("THD::binlog_flush_pending_rows_event");
@@ -6883,9 +7182,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
- binlog_table_maps= 0;
+ reset_binlog_for_next_statement();
}
-
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0,
is_transactional);
}
@@ -7245,20 +7543,23 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
log event is written to the binary log, we pretend that no
table maps were written.
*/
- if(binlog_should_compress(query_len))
+ if (binlog_should_compress(query_len))
{
- Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, direct,
- suppress_use, errcode);
+ Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans,
+ direct, suppress_use, errcode);
error= mysql_bin_log.write(&qinfo);
}
else
{
Query_log_event qinfo(this, query_arg, query_len, is_trans, direct,
- suppress_use, errcode);
+ suppress_use, errcode);
error= mysql_bin_log.write(&qinfo);
}
+ /*
+ row logged binlog may not have been reset in the case of locked tables
+ */
+ reset_binlog_for_next_statement();
- binlog_table_maps= 0;
DBUG_RETURN(error >= 0 ? error : 1);
}
@@ -7269,6 +7570,38 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
DBUG_RETURN(0);
}
+
+/**
+ Binlog current query as a statement, ignoring the binlog filter setting.
+
+ The filter is in decide_logging_format() to mark queries to not be stored
+ in the binary log, for example by a shared distributed engine like S3.
+ This function resets the filter to ensure the the query is logged if
+ the binlog is active.
+
+ Note that 'direct' is set to false, which means that the query will
+ not be directly written to the binary log but instead to the cache.
+
+ @retval false ok
+ @retval true error
+*/
+
+
+bool THD::binlog_current_query_unfiltered()
+{
+ if (!mysql_bin_log.is_open())
+ return 0;
+
+ reset_binlog_local_stmt_filter();
+ clear_binlog_local_stmt_filter();
+ return binlog_query(THD::STMT_QUERY_TYPE, query(), query_length(),
+ /* is_trans */ FALSE,
+ /* direct */ FALSE,
+ /* suppress_use */ FALSE,
+ /* Error */ 0) > 0;
+}
+
+
void
THD::wait_for_wakeup_ready()
{
@@ -7294,11 +7627,10 @@ void THD::set_last_commit_gtid(rpl_gtid &gtid)
#endif
m_last_commit_gtid= gtid;
#ifndef EMBEDDED_LIBRARY
- if (changed_gtid && session_tracker.sysvars.is_enabled())
+ if (changed_gtid)
{
DBUG_ASSERT(current_thd == this);
- session_tracker.sysvars.
- mark_as_changed(this, (LEX_CSTRING*)Sys_last_gtid_ptr);
+ session_tracker.sysvars.mark_as_changed(this, Sys_last_gtid_ptr);
}
#endif
}
@@ -7369,6 +7701,7 @@ wait_for_commit::~wait_for_commit()
mysql_cond_destroy(&COND_wait_commit);
}
+
void
wait_for_commit::wakeup(int wakeup_error)
{
@@ -7776,3 +8109,8 @@ bool THD::timestamp_to_TIME(MYSQL_TIME *ltime, my_time_t ts,
}
return 0;
}
+
+THD_list_iterator *THD_list_iterator::iterator()
+{
+ return &server_threads;
+}