diff options
author | Michael Widenius <monty@askmonty.org> | 2013-10-14 00:24:05 +0300 |
---|---|---|
committer | Michael Widenius <monty@askmonty.org> | 2013-10-14 00:24:05 +0300 |
commit | 2e100cc5a493b6a0f6f907e0483a734c7fee2087 (patch) | |
tree | 265f66e6bbd503d0ffe36a1a664bb39291e42755 /sql/sql_base.cc | |
parent | 3784432256a30e4d453dde10c875d8446519e7c1 (diff) | |
download | mariadb-git-2e100cc5a493b6a0f6f907e0483a734c7fee2087.tar.gz |
Fixes for parallel slave:
- Made slaves temporary table multi-thread slave safe by adding mutex around save_temporary_table usage.
- rli->save_temporary_tables is the active list of all used temporary tables
- This is copied to THD->temporary_tables when temporary tables are opened and updated when temporary tables are closed
- Added THD->lock_temporary_tables() and THD->unlock_temporary_tables() to simplify this.
- Relay_log_info->sql_thd renamed to Relay_log_info->sql_driver_thd to avoid wrong usage for merged code.
- Added is_part_of_group() to mark functions that are part of the next function. This replaces setting IN_STMT when events are executed.
- Added is_begin(), is_commit() and is_rollback() functions to Query_log_event to simplify code.
- If slave_skip_counter is set run things in single threaded mode. This simplifies code for skipping events.
- Updating state of relay log (IN_STMT and IN_TRANSACTION) is moved to one single function: update_state_of_relay_log()
We can't use OPTION_BEGIN to check for the state anymore as the sql_driver and sql execution threads may be different.
Clear IN_STMT and IN_TRANSACTION in init_relay_log_pos() and Relay_log_info::cleanup_context() to ensure the flags doesn't survive slave restarts
is_in_group() is now independent of state of executed transaction.
- Reset thd->transaction.all.modified_non_trans_table() if we did set it for single table row events.
This was mainly for keeping the flag as documented.
- Changed slave_open_temp_tables to uint32 to be able to use atomic operators on it.
- Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock
- Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond
- Changed some functions to take rpl_group_info instead of Relay_log_info to make them multi-slave safe and to simplify usage
- do_shall_skip()
- continue_group()
- sql_slave_killed()
- next_event()
- Simplifed arguments to io_salve_killed(), check_io_slave_killed() and sql_slave_killed(); No reason to supply THD as this is part of the given structure.
- set_thd_in_use_temporary_tables() removed as in_use is set on usage
- Added information to thd_proc_info() which thread is waiting for slave mutex to exit.
- In open_table() reuse code from find_temporary_table()
Other things:
- More DBUG statements
- Fixed the rpl_incident.test can be run with --debug
- More comments
- Disabled not used function rpl_connect_master()
mysql-test/suite/perfschema/r/all_instances.result:
Moved sleep_lock and sleep_cond to rpl_group_info
mysql-test/suite/rpl/r/rpl_incident.result:
Updated result
mysql-test/suite/rpl/t/rpl_incident-master.opt:
Not needed anymore
mysql-test/suite/rpl/t/rpl_incident.test:
Fixed that test can be run with --debug
sql/handler.cc:
More DBUG_PRINT
sql/log.cc:
More comments
sql/log_event.cc:
Added DBUG statements
do_shall_skip(), continue_group() now takes rpl_group_info param
Use is_begin(), is_commit() and is_rollback() functions instead of inspecting query string
We don't have set slaves temporary tables 'in_use' as this is now done when tables are opened.
Removed IN_STMT flag setting. This is now done in update_state_of_relay_log()
Use IN_TRANSACTION flag to test state of relay log.
In rows_event_stmt_cleanup() reset thd->transaction.all.modified_non_trans_table if we had set this before.
sql/log_event.h:
do_shall_skip(), continue_group() now takes rpl_group_info param
Added is_part_of_group() to mark events that are part of the next event. This replaces setting IN_STMT when events are executed.
Added is_begin(), is_commit() and is_rollback() functions to Query_log_event to simplify code.
sql/log_event_old.cc:
Removed IN_STMT flag setting. This is now done in update_state_of_relay_log()
do_shall_skip(), continue_group() now takes rpl_group_info param
sql/log_event_old.h:
Added is_part_of_group() to mark events that are part of the next event.
do_shall_skip(), continue_group() now takes rpl_group_info param
sql/mysqld.cc:
Changed slave_open_temp_tables to uint32 to be able to use atomic operators on it.
Relay_log_info::sleep_lock -> Rpl_group_info::sleep_lock
Relay_log_info::sleep_cond -> Rpl_group_info::sleep_cond
sql/mysqld.h:
Updated types and names
sql/rpl_gtid.cc:
More DBUG
sql/rpl_parallel.cc:
Updated TODO section
Set thd for event that is execution
Use new is_begin(), is_commit() and is_rollback() functions.
More comments
sql/rpl_rli.cc:
sql_thd -> sql_driver_thd
Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock
Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond
Clear IN_STMT and IN_TRANSACTION in init_relay_log_pos() and Relay_log_info::cleanup_context() to ensure the flags doesn't survive slave restarts.
Reset table->in_use for temporary tables as the table may have been used by another THD.
Use IN_TRANSACTION instead of OPTION_BEGIN to check state of relay log.
Removed IN_STMT flag setting. This is now done in update_state_of_relay_log()
sql/rpl_rli.h:
Changed relay log state flags to bit masks instead of bit positions (most other code we have uses bit masks)
Added IN_TRANSACTION to mark if we are in a BEGIN ... COMMIT section.
save_temporary_tables is now thread safe
Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock
Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond
Relay_log_info->sql_thd renamed to Relay_log_info->sql_driver_thd to avoid wrong usage for merged code
is_in_group() is now independent of state of executed transaction.
sql/slave.cc:
Simplifed arguments to io_salve_killed(), sql_slave_killed() and check_io_slave_killed(); No reason to supply THD as this is part of the given structure.
set_thd_in_use_temporary_tables() removed as in_use is set on usage in sql_base.cc
sql_thd -> sql_driver_thd
More DBUG
Added update_state_of_relay_log() which will calculate the IN_STMT and IN_TRANSACTION state of the relay log after the current element is executed.
If slave_skip_counter is set run things in single threaded mode.
Simplifed arguments to io_salve_killed(), check_io_slave_killed() and sql_slave_killed(); No reason to supply THD as this is part of the given structure.
Added information to thd_proc_info() which thread is waiting for slave mutex to exit.
Disabled not used function rpl_connect_master()
Updated argument to next_event()
sql/sql_base.cc:
Added mutex around usage of slave's temporary tables. The active list is always kept up to date in sql->rgi_slave->save_temporary_tables.
Clear thd->temporary_tables after query (safety)
More DBUG
When using temporary table, set table->in_use to current thd as the THD may be different for slave threads.
Some code is ifdef:ed with REMOVE_AFTER_MERGE_WITH_10 as the given code in 10.0 is not yet in this tree.
In open_table() reuse code from find_temporary_table()
sql/sql_binlog.cc:
rli->sql_thd -> rli->sql_driver_thd
Remove duplicate setting of rgi->rli
sql/sql_class.cc:
Added helper functions rgi_lock_temporary_tables() and rgi_unlock_temporary_tables()
Would have been nicer to have these inline, but there was no easy way to do that
sql/sql_class.h:
Added functions to protect slaves temporary tables
sql/sql_parse.cc:
Added DBUG_PRINT
sql/transaction.cc:
Added comment
Diffstat (limited to 'sql/sql_base.cc')
-rw-r--r-- | sql/sql_base.cc | 118 |
1 files changed, 85 insertions, 33 deletions
diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 109a4ef41e9..80c0b98fd73 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -57,6 +57,7 @@ #include "sql_table.h" // build_table_filename #include "datadict.h" // dd_frm_is_view() #include "sql_hset.h" // Hash_set +#include "rpl_rli.h" // rpl_group_info #ifdef __WIN__ #include <io.h> #endif @@ -1230,11 +1231,24 @@ bool close_cached_connection_tables(THD *thd, LEX_STRING *connection) static void mark_temp_tables_as_free_for_reuse(THD *thd) { + DBUG_ENTER("mark_temp_tables_as_free_for_reuse"); + + thd->lock_temporary_tables(); for (TABLE *table= thd->temporary_tables ; table ; table= table->next) { if ((table->query_id == thd->query_id) && ! table->open_by_handler) mark_tmp_table_for_reuse(table); } + thd->unlock_temporary_tables(); + if (thd->rgi_slave) + { + /* + Temporary tables are shared with other by sql execution threads. + As a safety messure, clear the pointer to the common area. + */ + thd->temporary_tables= 0; + } + DBUG_VOID_RETURN; } @@ -1248,6 +1262,7 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd) void mark_tmp_table_for_reuse(TABLE *table) { + DBUG_ENTER("mark_tmp_table_for_reuse"); DBUG_ASSERT(table->s->tmp_table); table->query_id= 0; @@ -1278,6 +1293,7 @@ void mark_tmp_table_for_reuse(TABLE *table) LOCK TABLES is allowed (but ignored) for a temporary table. */ table->reginfo.lock_type= TL_WRITE; + DBUG_VOID_RETURN; } @@ -1628,6 +1644,10 @@ static inline uint tmpkeyval(THD *thd, TABLE *table) /* Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread creates one DROP TEMPORARY TABLE binlog event for each pseudo-thread + + Temporary tables created in a sql slave is closed by + Relay_log_info::close_temporary_tables() + */ bool close_temporary_tables(THD *thd) @@ -1642,6 +1662,7 @@ bool close_temporary_tables(THD *thd) if (!thd->temporary_tables) DBUG_RETURN(FALSE); + DBUG_ASSERT(!thd->rgi_slave); if (!mysql_bin_log.is_open()) { @@ -2096,16 +2117,42 @@ TABLE *find_temporary_table(THD *thd, const char *table_key, uint table_key_length) { + TABLE *result= 0; + if (!thd->have_temporary_tables()) + return NULL; + + thd->lock_temporary_tables(); for (TABLE *table= thd->temporary_tables; table; table= table->next) { if (table->s->table_cache_key.length == table_key_length && !memcmp(table->s->table_cache_key.str, table_key, table_key_length)) { - return table; + /* + We need to set the THD as it may be different in case of + parallel replication + */ + if (table->in_use != thd) + { + table->in_use= thd; +#ifdef REMOVE_AFTER_MERGE_WITH_10 + if (thd->rgi_slave) + { + /* + We may be stealing an opened temporary tables from one slave + thread to another, we need to let the performance schema know that, + for aggregates per thread to work properly. + */ + table->file->unbind_psi(); + table->file->rebind_psi(); + } +#endif + } + result= table; + break; } } - - return NULL; + thd->unlock_temporary_tables(); + return result; } @@ -2153,6 +2200,9 @@ int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans) /* Table might be in use by some outer statement. */ if (table->query_id && table->query_id != thd->query_id) { + DBUG_PRINT("info", ("table->query_id: %lu thd->query_id: %lu", + (ulong) table->query_id, (ulong) thd->query_id)); + my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr()); DBUG_RETURN(-1); } @@ -2181,6 +2231,7 @@ void close_temporary_table(THD *thd, TABLE *table, table->s->db.str, table->s->table_name.str, (long) table, table->alias.c_ptr())); + thd->lock_temporary_tables(); if (table->prev) { table->prev->next= table->next; @@ -2200,12 +2251,14 @@ void close_temporary_table(THD *thd, TABLE *table, if (thd->temporary_tables) table->next->prev= 0; } - if (thd->slave_thread) + if (thd->rgi_slave) { /* natural invariant of temporary_tables */ DBUG_ASSERT(slave_open_temp_tables || !thd->temporary_tables); - slave_open_temp_tables--; + thread_safe_decrement32(&slave_open_temp_tables, &thread_running_lock); + table->in_use= 0; // No statistics } + thd->unlock_temporary_tables(); close_temporary(table, free_share, delete_table); DBUG_VOID_RETURN; } @@ -2651,35 +2704,30 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root, TODO: move this block into a separate function. */ if (table_list->open_type != OT_BASE_ONLY && - ! (flags & MYSQL_OPEN_SKIP_TEMPORARY)) + ! (flags & MYSQL_OPEN_SKIP_TEMPORARY) && thd->have_temporary_tables()) { - for (table= thd->temporary_tables; table ; table=table->next) - { - if (table->s->table_cache_key.length == key_length + - TMP_TABLE_KEY_EXTRA && - !memcmp(table->s->table_cache_key.str, key, - key_length + TMP_TABLE_KEY_EXTRA)) + if ((table= find_temporary_table(thd, key, + key_length + TMP_TABLE_KEY_EXTRA))) + { + /* + Check if we're trying to use the same temporary table twice in a query. + Right now we don't support this because a temporary table + is always represented by only one TABLE object in THD, and + it can not be cloned. Emit an error for an unsupported behaviour. + */ + if (table->query_id) { - /* - We're trying to use the same temporary table twice in a query. - Right now we don't support this because a temporary table - is always represented by only one TABLE object in THD, and - it can not be cloned. Emit an error for an unsupported behaviour. - */ - if (table->query_id) - { - DBUG_PRINT("error", - ("query_id: %lu server_id: %u pseudo_thread_id: %lu", - (ulong) table->query_id, (uint) thd->variables.server_id, - (ulong) thd->variables.pseudo_thread_id)); - my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr()); - DBUG_RETURN(TRUE); - } - table->query_id= thd->query_id; - thd->thread_specific_used= TRUE; - DBUG_PRINT("info",("Using temporary table")); - goto reset; + DBUG_PRINT("error", + ("query_id: %lu server_id: %u pseudo_thread_id: %lu", + (ulong) table->query_id, (uint) thd->variables.server_id, + (ulong) thd->variables.pseudo_thread_id)); + my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr()); + DBUG_RETURN(TRUE); } + table->query_id= thd->query_id; + thd->thread_specific_used= TRUE; + DBUG_PRINT("info",("Using temporary table")); + goto reset; } } @@ -5987,14 +6035,18 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton, if (add_to_temporary_tables_list) { + thd->lock_temporary_tables(); /* growing temp list at the head */ tmp_table->next= thd->temporary_tables; if (tmp_table->next) tmp_table->next->prev= tmp_table; thd->temporary_tables= tmp_table; thd->temporary_tables->prev= 0; - if (thd->slave_thread) - slave_open_temp_tables++; + if (thd->rgi_slave) + { + thread_safe_increment32(&slave_open_temp_tables, &thread_running_lock); + } + thd->unlock_temporary_tables(); } tmp_table->pos_in_table_list= 0; DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str, |