summaryrefslogtreecommitdiff
path: root/sql/sql_base.cc
diff options
context:
space:
mode:
authorMichael Widenius <monty@askmonty.org>2013-10-14 00:24:05 +0300
committerMichael Widenius <monty@askmonty.org>2013-10-14 00:24:05 +0300
commit2e100cc5a493b6a0f6f907e0483a734c7fee2087 (patch)
tree265f66e6bbd503d0ffe36a1a664bb39291e42755 /sql/sql_base.cc
parent3784432256a30e4d453dde10c875d8446519e7c1 (diff)
downloadmariadb-git-2e100cc5a493b6a0f6f907e0483a734c7fee2087.tar.gz
Fixes for parallel slave:
- Made slaves temporary table multi-thread slave safe by adding mutex around save_temporary_table usage. - rli->save_temporary_tables is the active list of all used temporary tables - This is copied to THD->temporary_tables when temporary tables are opened and updated when temporary tables are closed - Added THD->lock_temporary_tables() and THD->unlock_temporary_tables() to simplify this. - Relay_log_info->sql_thd renamed to Relay_log_info->sql_driver_thd to avoid wrong usage for merged code. - Added is_part_of_group() to mark functions that are part of the next function. This replaces setting IN_STMT when events are executed. - Added is_begin(), is_commit() and is_rollback() functions to Query_log_event to simplify code. - If slave_skip_counter is set run things in single threaded mode. This simplifies code for skipping events. - Updating state of relay log (IN_STMT and IN_TRANSACTION) is moved to one single function: update_state_of_relay_log() We can't use OPTION_BEGIN to check for the state anymore as the sql_driver and sql execution threads may be different. Clear IN_STMT and IN_TRANSACTION in init_relay_log_pos() and Relay_log_info::cleanup_context() to ensure the flags doesn't survive slave restarts is_in_group() is now independent of state of executed transaction. - Reset thd->transaction.all.modified_non_trans_table() if we did set it for single table row events. This was mainly for keeping the flag as documented. - Changed slave_open_temp_tables to uint32 to be able to use atomic operators on it. - Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock - Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond - Changed some functions to take rpl_group_info instead of Relay_log_info to make them multi-slave safe and to simplify usage - do_shall_skip() - continue_group() - sql_slave_killed() - next_event() - Simplifed arguments to io_salve_killed(), check_io_slave_killed() and sql_slave_killed(); No reason to supply THD as this is part of the given structure. - set_thd_in_use_temporary_tables() removed as in_use is set on usage - Added information to thd_proc_info() which thread is waiting for slave mutex to exit. - In open_table() reuse code from find_temporary_table() Other things: - More DBUG statements - Fixed the rpl_incident.test can be run with --debug - More comments - Disabled not used function rpl_connect_master() mysql-test/suite/perfschema/r/all_instances.result: Moved sleep_lock and sleep_cond to rpl_group_info mysql-test/suite/rpl/r/rpl_incident.result: Updated result mysql-test/suite/rpl/t/rpl_incident-master.opt: Not needed anymore mysql-test/suite/rpl/t/rpl_incident.test: Fixed that test can be run with --debug sql/handler.cc: More DBUG_PRINT sql/log.cc: More comments sql/log_event.cc: Added DBUG statements do_shall_skip(), continue_group() now takes rpl_group_info param Use is_begin(), is_commit() and is_rollback() functions instead of inspecting query string We don't have set slaves temporary tables 'in_use' as this is now done when tables are opened. Removed IN_STMT flag setting. This is now done in update_state_of_relay_log() Use IN_TRANSACTION flag to test state of relay log. In rows_event_stmt_cleanup() reset thd->transaction.all.modified_non_trans_table if we had set this before. sql/log_event.h: do_shall_skip(), continue_group() now takes rpl_group_info param Added is_part_of_group() to mark events that are part of the next event. This replaces setting IN_STMT when events are executed. Added is_begin(), is_commit() and is_rollback() functions to Query_log_event to simplify code. sql/log_event_old.cc: Removed IN_STMT flag setting. This is now done in update_state_of_relay_log() do_shall_skip(), continue_group() now takes rpl_group_info param sql/log_event_old.h: Added is_part_of_group() to mark events that are part of the next event. do_shall_skip(), continue_group() now takes rpl_group_info param sql/mysqld.cc: Changed slave_open_temp_tables to uint32 to be able to use atomic operators on it. Relay_log_info::sleep_lock -> Rpl_group_info::sleep_lock Relay_log_info::sleep_cond -> Rpl_group_info::sleep_cond sql/mysqld.h: Updated types and names sql/rpl_gtid.cc: More DBUG sql/rpl_parallel.cc: Updated TODO section Set thd for event that is execution Use new is_begin(), is_commit() and is_rollback() functions. More comments sql/rpl_rli.cc: sql_thd -> sql_driver_thd Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond Clear IN_STMT and IN_TRANSACTION in init_relay_log_pos() and Relay_log_info::cleanup_context() to ensure the flags doesn't survive slave restarts. Reset table->in_use for temporary tables as the table may have been used by another THD. Use IN_TRANSACTION instead of OPTION_BEGIN to check state of relay log. Removed IN_STMT flag setting. This is now done in update_state_of_relay_log() sql/rpl_rli.h: Changed relay log state flags to bit masks instead of bit positions (most other code we have uses bit masks) Added IN_TRANSACTION to mark if we are in a BEGIN ... COMMIT section. save_temporary_tables is now thread safe Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond Relay_log_info->sql_thd renamed to Relay_log_info->sql_driver_thd to avoid wrong usage for merged code is_in_group() is now independent of state of executed transaction. sql/slave.cc: Simplifed arguments to io_salve_killed(), sql_slave_killed() and check_io_slave_killed(); No reason to supply THD as this is part of the given structure. set_thd_in_use_temporary_tables() removed as in_use is set on usage in sql_base.cc sql_thd -> sql_driver_thd More DBUG Added update_state_of_relay_log() which will calculate the IN_STMT and IN_TRANSACTION state of the relay log after the current element is executed. If slave_skip_counter is set run things in single threaded mode. Simplifed arguments to io_salve_killed(), check_io_slave_killed() and sql_slave_killed(); No reason to supply THD as this is part of the given structure. Added information to thd_proc_info() which thread is waiting for slave mutex to exit. Disabled not used function rpl_connect_master() Updated argument to next_event() sql/sql_base.cc: Added mutex around usage of slave's temporary tables. The active list is always kept up to date in sql->rgi_slave->save_temporary_tables. Clear thd->temporary_tables after query (safety) More DBUG When using temporary table, set table->in_use to current thd as the THD may be different for slave threads. Some code is ifdef:ed with REMOVE_AFTER_MERGE_WITH_10 as the given code in 10.0 is not yet in this tree. In open_table() reuse code from find_temporary_table() sql/sql_binlog.cc: rli->sql_thd -> rli->sql_driver_thd Remove duplicate setting of rgi->rli sql/sql_class.cc: Added helper functions rgi_lock_temporary_tables() and rgi_unlock_temporary_tables() Would have been nicer to have these inline, but there was no easy way to do that sql/sql_class.h: Added functions to protect slaves temporary tables sql/sql_parse.cc: Added DBUG_PRINT sql/transaction.cc: Added comment
Diffstat (limited to 'sql/sql_base.cc')
-rw-r--r--sql/sql_base.cc118
1 files changed, 85 insertions, 33 deletions
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 109a4ef41e9..80c0b98fd73 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -57,6 +57,7 @@
#include "sql_table.h" // build_table_filename
#include "datadict.h" // dd_frm_is_view()
#include "sql_hset.h" // Hash_set
+#include "rpl_rli.h" // rpl_group_info
#ifdef __WIN__
#include <io.h>
#endif
@@ -1230,11 +1231,24 @@ bool close_cached_connection_tables(THD *thd, LEX_STRING *connection)
static void mark_temp_tables_as_free_for_reuse(THD *thd)
{
+ DBUG_ENTER("mark_temp_tables_as_free_for_reuse");
+
+ thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
}
+ thd->unlock_temporary_tables();
+ if (thd->rgi_slave)
+ {
+ /*
+ Temporary tables are shared with other by sql execution threads.
+ As a safety messure, clear the pointer to the common area.
+ */
+ thd->temporary_tables= 0;
+ }
+ DBUG_VOID_RETURN;
}
@@ -1248,6 +1262,7 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd)
void mark_tmp_table_for_reuse(TABLE *table)
{
+ DBUG_ENTER("mark_tmp_table_for_reuse");
DBUG_ASSERT(table->s->tmp_table);
table->query_id= 0;
@@ -1278,6 +1293,7 @@ void mark_tmp_table_for_reuse(TABLE *table)
LOCK TABLES is allowed (but ignored) for a temporary table.
*/
table->reginfo.lock_type= TL_WRITE;
+ DBUG_VOID_RETURN;
}
@@ -1628,6 +1644,10 @@ static inline uint tmpkeyval(THD *thd, TABLE *table)
/*
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
creates one DROP TEMPORARY TABLE binlog event for each pseudo-thread
+
+ Temporary tables created in a sql slave is closed by
+ Relay_log_info::close_temporary_tables()
+
*/
bool close_temporary_tables(THD *thd)
@@ -1642,6 +1662,7 @@ bool close_temporary_tables(THD *thd)
if (!thd->temporary_tables)
DBUG_RETURN(FALSE);
+ DBUG_ASSERT(!thd->rgi_slave);
if (!mysql_bin_log.is_open())
{
@@ -2096,16 +2117,42 @@ TABLE *find_temporary_table(THD *thd,
const char *table_key,
uint table_key_length)
{
+ TABLE *result= 0;
+ if (!thd->have_temporary_tables())
+ return NULL;
+
+ thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables; table; table= table->next)
{
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
- return table;
+ /*
+ We need to set the THD as it may be different in case of
+ parallel replication
+ */
+ if (table->in_use != thd)
+ {
+ table->in_use= thd;
+#ifdef REMOVE_AFTER_MERGE_WITH_10
+ if (thd->rgi_slave)
+ {
+ /*
+ We may be stealing an opened temporary tables from one slave
+ thread to another, we need to let the performance schema know that,
+ for aggregates per thread to work properly.
+ */
+ table->file->unbind_psi();
+ table->file->rebind_psi();
+ }
+#endif
+ }
+ result= table;
+ break;
}
}
-
- return NULL;
+ thd->unlock_temporary_tables();
+ return result;
}
@@ -2153,6 +2200,9 @@ int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
/* Table might be in use by some outer statement. */
if (table->query_id && table->query_id != thd->query_id)
{
+ DBUG_PRINT("info", ("table->query_id: %lu thd->query_id: %lu",
+ (ulong) table->query_id, (ulong) thd->query_id));
+
my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
DBUG_RETURN(-1);
}
@@ -2181,6 +2231,7 @@ void close_temporary_table(THD *thd, TABLE *table,
table->s->db.str, table->s->table_name.str,
(long) table, table->alias.c_ptr()));
+ thd->lock_temporary_tables();
if (table->prev)
{
table->prev->next= table->next;
@@ -2200,12 +2251,14 @@ void close_temporary_table(THD *thd, TABLE *table,
if (thd->temporary_tables)
table->next->prev= 0;
}
- if (thd->slave_thread)
+ if (thd->rgi_slave)
{
/* natural invariant of temporary_tables */
DBUG_ASSERT(slave_open_temp_tables || !thd->temporary_tables);
- slave_open_temp_tables--;
+ thread_safe_decrement32(&slave_open_temp_tables, &thread_running_lock);
+ table->in_use= 0; // No statistics
}
+ thd->unlock_temporary_tables();
close_temporary(table, free_share, delete_table);
DBUG_VOID_RETURN;
}
@@ -2651,35 +2704,30 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
TODO: move this block into a separate function.
*/
if (table_list->open_type != OT_BASE_ONLY &&
- ! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
+ ! (flags & MYSQL_OPEN_SKIP_TEMPORARY) && thd->have_temporary_tables())
{
- for (table= thd->temporary_tables; table ; table=table->next)
- {
- if (table->s->table_cache_key.length == key_length +
- TMP_TABLE_KEY_EXTRA &&
- !memcmp(table->s->table_cache_key.str, key,
- key_length + TMP_TABLE_KEY_EXTRA))
+ if ((table= find_temporary_table(thd, key,
+ key_length + TMP_TABLE_KEY_EXTRA)))
+ {
+ /*
+ Check if we're trying to use the same temporary table twice in a query.
+ Right now we don't support this because a temporary table
+ is always represented by only one TABLE object in THD, and
+ it can not be cloned. Emit an error for an unsupported behaviour.
+ */
+ if (table->query_id)
{
- /*
- We're trying to use the same temporary table twice in a query.
- Right now we don't support this because a temporary table
- is always represented by only one TABLE object in THD, and
- it can not be cloned. Emit an error for an unsupported behaviour.
- */
- if (table->query_id)
- {
- DBUG_PRINT("error",
- ("query_id: %lu server_id: %u pseudo_thread_id: %lu",
- (ulong) table->query_id, (uint) thd->variables.server_id,
- (ulong) thd->variables.pseudo_thread_id));
- my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
- DBUG_RETURN(TRUE);
- }
- table->query_id= thd->query_id;
- thd->thread_specific_used= TRUE;
- DBUG_PRINT("info",("Using temporary table"));
- goto reset;
+ DBUG_PRINT("error",
+ ("query_id: %lu server_id: %u pseudo_thread_id: %lu",
+ (ulong) table->query_id, (uint) thd->variables.server_id,
+ (ulong) thd->variables.pseudo_thread_id));
+ my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
+ DBUG_RETURN(TRUE);
}
+ table->query_id= thd->query_id;
+ thd->thread_specific_used= TRUE;
+ DBUG_PRINT("info",("Using temporary table"));
+ goto reset;
}
}
@@ -5987,14 +6035,18 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton,
if (add_to_temporary_tables_list)
{
+ thd->lock_temporary_tables();
/* growing temp list at the head */
tmp_table->next= thd->temporary_tables;
if (tmp_table->next)
tmp_table->next->prev= tmp_table;
thd->temporary_tables= tmp_table;
thd->temporary_tables->prev= 0;
- if (thd->slave_thread)
- slave_open_temp_tables++;
+ if (thd->rgi_slave)
+ {
+ thread_safe_increment32(&slave_open_temp_tables, &thread_running_lock);
+ }
+ thd->unlock_temporary_tables();
}
tmp_table->pos_in_table_list= 0;
DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,