diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-11-01 09:17:06 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-11-01 09:17:06 +0100 |
commit | cb86ce60b9bade5ae7712d8f3f74668208ee3fd2 (patch) | |
tree | daff81c02baa6c2581d6abe3d746b8f35ee44f32 /sql/sql_base.cc | |
parent | f4d5d849fd3b526d38ca6eb083fd0b290eb0eda7 (diff) | |
parent | 39df665a3332bd9bfb2529419f534a49cfac388c (diff) | |
download | mariadb-git-cb86ce60b9bade5ae7712d8f3f74668208ee3fd2.tar.gz |
Merge MDEV-4506: Parallel replication into 10.0-base.
Diffstat (limited to 'sql/sql_base.cc')
-rw-r--r-- | sql/sql_base.cc | 118 |
1 files changed, 85 insertions, 33 deletions
diff --git a/sql/sql_base.cc b/sql/sql_base.cc index c0999efd04f..34737f0dca0 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -57,6 +57,7 @@ #include "sql_table.h" // build_table_filename #include "datadict.h" // dd_frm_is_view() #include "sql_hset.h" // Hash_set +#include "rpl_rli.h" // rpl_group_info #ifdef __WIN__ #include <io.h> #endif @@ -1228,11 +1229,24 @@ bool close_cached_connection_tables(THD *thd, LEX_STRING *connection) static void mark_temp_tables_as_free_for_reuse(THD *thd) { + DBUG_ENTER("mark_temp_tables_as_free_for_reuse"); + + thd->lock_temporary_tables(); for (TABLE *table= thd->temporary_tables ; table ; table= table->next) { if ((table->query_id == thd->query_id) && ! table->open_by_handler) mark_tmp_table_for_reuse(table); } + thd->unlock_temporary_tables(); + if (thd->rgi_slave) + { + /* + Temporary tables are shared with other by sql execution threads. + As a safety messure, clear the pointer to the common area. + */ + thd->temporary_tables= 0; + } + DBUG_VOID_RETURN; } @@ -1246,6 +1260,7 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd) void mark_tmp_table_for_reuse(TABLE *table) { + DBUG_ENTER("mark_tmp_table_for_reuse"); DBUG_ASSERT(table->s->tmp_table); table->query_id= 0; @@ -1276,6 +1291,7 @@ void mark_tmp_table_for_reuse(TABLE *table) LOCK TABLES is allowed (but ignored) for a temporary table. */ table->reginfo.lock_type= TL_WRITE; + DBUG_VOID_RETURN; } @@ -1626,6 +1642,10 @@ static inline uint tmpkeyval(THD *thd, TABLE *table) /* Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread creates one DROP TEMPORARY TABLE binlog event for each pseudo-thread + + Temporary tables created in a sql slave is closed by + Relay_log_info::close_temporary_tables() + */ bool close_temporary_tables(THD *thd) @@ -1640,6 +1660,7 @@ bool close_temporary_tables(THD *thd) if (!thd->temporary_tables) DBUG_RETURN(FALSE); + DBUG_ASSERT(!thd->rgi_slave); if (!mysql_bin_log.is_open()) { @@ -2094,16 +2115,42 @@ TABLE *find_temporary_table(THD *thd, const char *table_key, uint table_key_length) { + TABLE *result= 0; + if (!thd->have_temporary_tables()) + return NULL; + + thd->lock_temporary_tables(); for (TABLE *table= thd->temporary_tables; table; table= table->next) { if (table->s->table_cache_key.length == table_key_length && !memcmp(table->s->table_cache_key.str, table_key, table_key_length)) { - return table; + /* + We need to set the THD as it may be different in case of + parallel replication + */ + if (table->in_use != thd) + { + table->in_use= thd; +#ifdef REMOVE_AFTER_MERGE_WITH_10 + if (thd->rgi_slave) + { + /* + We may be stealing an opened temporary tables from one slave + thread to another, we need to let the performance schema know that, + for aggregates per thread to work properly. + */ + table->file->unbind_psi(); + table->file->rebind_psi(); + } +#endif + } + result= table; + break; } } - - return NULL; + thd->unlock_temporary_tables(); + return result; } @@ -2151,6 +2198,9 @@ int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans) /* Table might be in use by some outer statement. */ if (table->query_id && table->query_id != thd->query_id) { + DBUG_PRINT("info", ("table->query_id: %lu thd->query_id: %lu", + (ulong) table->query_id, (ulong) thd->query_id)); + my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr()); DBUG_RETURN(-1); } @@ -2179,6 +2229,7 @@ void close_temporary_table(THD *thd, TABLE *table, table->s->db.str, table->s->table_name.str, (long) table, table->alias.c_ptr())); + thd->lock_temporary_tables(); if (table->prev) { table->prev->next= table->next; @@ -2198,12 +2249,14 @@ void close_temporary_table(THD *thd, TABLE *table, if (thd->temporary_tables) table->next->prev= 0; } - if (thd->slave_thread) + if (thd->rgi_slave) { /* natural invariant of temporary_tables */ DBUG_ASSERT(slave_open_temp_tables || !thd->temporary_tables); - slave_open_temp_tables--; + thread_safe_decrement32(&slave_open_temp_tables, &thread_running_lock); + table->in_use= 0; // No statistics } + thd->unlock_temporary_tables(); close_temporary(table, free_share, delete_table); DBUG_VOID_RETURN; } @@ -2649,35 +2702,30 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root, TODO: move this block into a separate function. */ if (table_list->open_type != OT_BASE_ONLY && - ! (flags & MYSQL_OPEN_SKIP_TEMPORARY)) + ! (flags & MYSQL_OPEN_SKIP_TEMPORARY) && thd->have_temporary_tables()) { - for (table= thd->temporary_tables; table ; table=table->next) - { - if (table->s->table_cache_key.length == key_length + - TMP_TABLE_KEY_EXTRA && - !memcmp(table->s->table_cache_key.str, key, - key_length + TMP_TABLE_KEY_EXTRA)) + if ((table= find_temporary_table(thd, key, + key_length + TMP_TABLE_KEY_EXTRA))) + { + /* + Check if we're trying to use the same temporary table twice in a query. + Right now we don't support this because a temporary table + is always represented by only one TABLE object in THD, and + it can not be cloned. Emit an error for an unsupported behaviour. + */ + if (table->query_id) { - /* - We're trying to use the same temporary table twice in a query. - Right now we don't support this because a temporary table - is always represented by only one TABLE object in THD, and - it can not be cloned. Emit an error for an unsupported behaviour. - */ - if (table->query_id) - { - DBUG_PRINT("error", - ("query_id: %lu server_id: %u pseudo_thread_id: %lu", - (ulong) table->query_id, (uint) thd->variables.server_id, - (ulong) thd->variables.pseudo_thread_id)); - my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr()); - DBUG_RETURN(TRUE); - } - table->query_id= thd->query_id; - thd->thread_specific_used= TRUE; - DBUG_PRINT("info",("Using temporary table")); - goto reset; + DBUG_PRINT("error", + ("query_id: %lu server_id: %u pseudo_thread_id: %lu", + (ulong) table->query_id, (uint) thd->variables.server_id, + (ulong) thd->variables.pseudo_thread_id)); + my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr()); + DBUG_RETURN(TRUE); } + table->query_id= thd->query_id; + thd->thread_specific_used= TRUE; + DBUG_PRINT("info",("Using temporary table")); + goto reset; } } @@ -5985,14 +6033,18 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton, if (add_to_temporary_tables_list) { + thd->lock_temporary_tables(); /* growing temp list at the head */ tmp_table->next= thd->temporary_tables; if (tmp_table->next) tmp_table->next->prev= tmp_table; thd->temporary_tables= tmp_table; thd->temporary_tables->prev= 0; - if (thd->slave_thread) - slave_open_temp_tables++; + if (thd->rgi_slave) + { + thread_safe_increment32(&slave_open_temp_tables, &thread_running_lock); + } + thd->unlock_temporary_tables(); } tmp_table->pos_in_table_list= 0; DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str, |