summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Widenius <monty@askmonty.org>2013-10-14 00:24:05 +0300
committerMichael Widenius <monty@askmonty.org>2013-10-14 00:24:05 +0300
commit2e100cc5a493b6a0f6f907e0483a734c7fee2087 (patch)
tree265f66e6bbd503d0ffe36a1a664bb39291e42755 /sql
parent3784432256a30e4d453dde10c875d8446519e7c1 (diff)
downloadmariadb-git-2e100cc5a493b6a0f6f907e0483a734c7fee2087.tar.gz
Fixes for parallel slave:
- Made slaves temporary table multi-thread slave safe by adding mutex around save_temporary_table usage. - rli->save_temporary_tables is the active list of all used temporary tables - This is copied to THD->temporary_tables when temporary tables are opened and updated when temporary tables are closed - Added THD->lock_temporary_tables() and THD->unlock_temporary_tables() to simplify this. - Relay_log_info->sql_thd renamed to Relay_log_info->sql_driver_thd to avoid wrong usage for merged code. - Added is_part_of_group() to mark functions that are part of the next function. This replaces setting IN_STMT when events are executed. - Added is_begin(), is_commit() and is_rollback() functions to Query_log_event to simplify code. - If slave_skip_counter is set run things in single threaded mode. This simplifies code for skipping events. - Updating state of relay log (IN_STMT and IN_TRANSACTION) is moved to one single function: update_state_of_relay_log() We can't use OPTION_BEGIN to check for the state anymore as the sql_driver and sql execution threads may be different. Clear IN_STMT and IN_TRANSACTION in init_relay_log_pos() and Relay_log_info::cleanup_context() to ensure the flags doesn't survive slave restarts is_in_group() is now independent of state of executed transaction. - Reset thd->transaction.all.modified_non_trans_table() if we did set it for single table row events. This was mainly for keeping the flag as documented. - Changed slave_open_temp_tables to uint32 to be able to use atomic operators on it. - Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock - Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond - Changed some functions to take rpl_group_info instead of Relay_log_info to make them multi-slave safe and to simplify usage - do_shall_skip() - continue_group() - sql_slave_killed() - next_event() - Simplifed arguments to io_salve_killed(), check_io_slave_killed() and sql_slave_killed(); No reason to supply THD as this is part of the given structure. - set_thd_in_use_temporary_tables() removed as in_use is set on usage - Added information to thd_proc_info() which thread is waiting for slave mutex to exit. - In open_table() reuse code from find_temporary_table() Other things: - More DBUG statements - Fixed the rpl_incident.test can be run with --debug - More comments - Disabled not used function rpl_connect_master() mysql-test/suite/perfschema/r/all_instances.result: Moved sleep_lock and sleep_cond to rpl_group_info mysql-test/suite/rpl/r/rpl_incident.result: Updated result mysql-test/suite/rpl/t/rpl_incident-master.opt: Not needed anymore mysql-test/suite/rpl/t/rpl_incident.test: Fixed that test can be run with --debug sql/handler.cc: More DBUG_PRINT sql/log.cc: More comments sql/log_event.cc: Added DBUG statements do_shall_skip(), continue_group() now takes rpl_group_info param Use is_begin(), is_commit() and is_rollback() functions instead of inspecting query string We don't have set slaves temporary tables 'in_use' as this is now done when tables are opened. Removed IN_STMT flag setting. This is now done in update_state_of_relay_log() Use IN_TRANSACTION flag to test state of relay log. In rows_event_stmt_cleanup() reset thd->transaction.all.modified_non_trans_table if we had set this before. sql/log_event.h: do_shall_skip(), continue_group() now takes rpl_group_info param Added is_part_of_group() to mark events that are part of the next event. This replaces setting IN_STMT when events are executed. Added is_begin(), is_commit() and is_rollback() functions to Query_log_event to simplify code. sql/log_event_old.cc: Removed IN_STMT flag setting. This is now done in update_state_of_relay_log() do_shall_skip(), continue_group() now takes rpl_group_info param sql/log_event_old.h: Added is_part_of_group() to mark events that are part of the next event. do_shall_skip(), continue_group() now takes rpl_group_info param sql/mysqld.cc: Changed slave_open_temp_tables to uint32 to be able to use atomic operators on it. Relay_log_info::sleep_lock -> Rpl_group_info::sleep_lock Relay_log_info::sleep_cond -> Rpl_group_info::sleep_cond sql/mysqld.h: Updated types and names sql/rpl_gtid.cc: More DBUG sql/rpl_parallel.cc: Updated TODO section Set thd for event that is execution Use new is_begin(), is_commit() and is_rollback() functions. More comments sql/rpl_rli.cc: sql_thd -> sql_driver_thd Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond Clear IN_STMT and IN_TRANSACTION in init_relay_log_pos() and Relay_log_info::cleanup_context() to ensure the flags doesn't survive slave restarts. Reset table->in_use for temporary tables as the table may have been used by another THD. Use IN_TRANSACTION instead of OPTION_BEGIN to check state of relay log. Removed IN_STMT flag setting. This is now done in update_state_of_relay_log() sql/rpl_rli.h: Changed relay log state flags to bit masks instead of bit positions (most other code we have uses bit masks) Added IN_TRANSACTION to mark if we are in a BEGIN ... COMMIT section. save_temporary_tables is now thread safe Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond Relay_log_info->sql_thd renamed to Relay_log_info->sql_driver_thd to avoid wrong usage for merged code is_in_group() is now independent of state of executed transaction. sql/slave.cc: Simplifed arguments to io_salve_killed(), sql_slave_killed() and check_io_slave_killed(); No reason to supply THD as this is part of the given structure. set_thd_in_use_temporary_tables() removed as in_use is set on usage in sql_base.cc sql_thd -> sql_driver_thd More DBUG Added update_state_of_relay_log() which will calculate the IN_STMT and IN_TRANSACTION state of the relay log after the current element is executed. If slave_skip_counter is set run things in single threaded mode. Simplifed arguments to io_salve_killed(), check_io_slave_killed() and sql_slave_killed(); No reason to supply THD as this is part of the given structure. Added information to thd_proc_info() which thread is waiting for slave mutex to exit. Disabled not used function rpl_connect_master() Updated argument to next_event() sql/sql_base.cc: Added mutex around usage of slave's temporary tables. The active list is always kept up to date in sql->rgi_slave->save_temporary_tables. Clear thd->temporary_tables after query (safety) More DBUG When using temporary table, set table->in_use to current thd as the THD may be different for slave threads. Some code is ifdef:ed with REMOVE_AFTER_MERGE_WITH_10 as the given code in 10.0 is not yet in this tree. In open_table() reuse code from find_temporary_table() sql/sql_binlog.cc: rli->sql_thd -> rli->sql_driver_thd Remove duplicate setting of rgi->rli sql/sql_class.cc: Added helper functions rgi_lock_temporary_tables() and rgi_unlock_temporary_tables() Would have been nicer to have these inline, but there was no easy way to do that sql/sql_class.h: Added functions to protect slaves temporary tables sql/sql_parse.cc: Added DBUG_PRINT sql/transaction.cc: Added comment
Diffstat (limited to 'sql')
-rw-r--r--sql/handler.cc4
-rw-r--r--sql/log.cc12
-rw-r--r--sql/log_event.cc154
-rw-r--r--sql/log_event.h55
-rw-r--r--sql/log_event_old.cc28
-rw-r--r--sql/log_event_old.h3
-rw-r--r--sql/mysqld.cc13
-rw-r--r--sql/mysqld.h6
-rw-r--r--sql/rpl_gtid.cc15
-rw-r--r--sql/rpl_parallel.cc21
-rw-r--r--sql/rpl_rli.cc49
-rw-r--r--sql/rpl_rli.h95
-rw-r--r--sql/slave.cc328
-rw-r--r--sql/sql_base.cc118
-rw-r--r--sql/sql_binlog.cc5
-rw-r--r--sql/sql_class.cc18
-rw-r--r--sql/sql_class.h21
-rw-r--r--sql/sql_parse.cc1
-rw-r--r--sql/transaction.cc5
19 files changed, 595 insertions, 356 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 25b2ee13187..c42204b27d1 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1247,6 +1247,8 @@ int ha_commit_trans(THD *thd, bool all)
bool need_prepare_ordered, need_commit_ordered;
my_xid xid;
DBUG_ENTER("ha_commit_trans");
+ DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d",
+ thd, (ulong) thd->variables.option_bits, all));
/* Just a random warning to test warnings pushed during autocommit. */
DBUG_EXECUTE_IF("warn_during_ha_commit_trans",
@@ -1306,6 +1308,8 @@ int ha_commit_trans(THD *thd, bool all)
/* rw_trans is TRUE when we in a transaction changing data */
bool rw_trans= is_real_trans && (rw_ha_count > 0);
MDL_request mdl_request;
+ DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
+ is_real_trans, rw_trans, rw_ha_count));
if (rw_trans)
{
diff --git a/sql/log.cc b/sql/log.cc
index 590c062351c..dd6eeb3678c 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6554,9 +6554,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
the commit and wake them up. This way, all transactions in the queue get
committed in a single disk operation.
- The return value of this function is TRUE if queued as the first entry in
- the queue (meaning this is the leader), FALSE otherwise.
-
The main work in this function is when the commit in one transaction has
been marked to wait for the commit of another transaction to happen
first. This is used to support in-order parallel replication, where
@@ -6570,6 +6567,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
transactions already prepared to commit but just waiting for the first one
to commit. If so, we add those to the queue as well, transitively for all
waiters.
+
+ @retval TRUE If queued as the first entry in the queue (meaning this
+ is the leader)
+ @retval FALSE Otherwise
*/
bool
@@ -6657,7 +6658,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
The end result is a breath-first traversal of the tree of waiters,
re-using the next_subsequent_commit pointers in place of extra stack
space in a recursive traversal.
+
+ The temporary list created in next_subsequent_commit is not
+ used by the caller or any other function.
*/
+
list= wfc;
cur= list;
last= list;
@@ -7239,6 +7244,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
Note that this function may release and re-acquire LOCK_log and
LOCK_prepare_ordered if it needs to wait.
*/
+
void
MYSQL_BIN_LOG::wait_for_sufficient_commits()
{
diff --git a/sql/log_event.cc b/sql/log_event.cc
index cfbdd6aa626..59fc856c3f2 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -940,6 +940,8 @@ Log_event::Log_event(const char* buf,
int Log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
+ DBUG_ENTER("Log_event::do_update_pos");
+
/*
rli is null when (as far as I (Guilhem) know) the caller is
Load_log_event::do_apply_event *and* that one is called from
@@ -973,13 +975,14 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
if (debug_not_change_ts_if_art_event == 0)
debug_not_change_ts_if_art_event= 2; );
}
- return 0; // Cannot fail currently
+ DBUG_RETURN(0); // Cannot fail currently
}
Log_event::enum_skip_reason
-Log_event::do_shall_skip(Relay_log_info *rli)
+Log_event::do_shall_skip(rpl_group_info *rgi)
{
+ Relay_log_info *rli= rgi->rli;
DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu,"
" rli->replicate_same_server_id: %d,"
" rli->slave_skip_counter: %lu",
@@ -2525,11 +2528,11 @@ void Log_event::print_timestamp(IO_CACHE* file, time_t* ts)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
inline Log_event::enum_skip_reason
-Log_event::continue_group(Relay_log_info *rli)
+Log_event::continue_group(rpl_group_info *rgi)
{
- if (rli->slave_skip_counter == 1)
+ if (rgi->rli->slave_skip_counter == 1)
return Log_event::EVENT_SKIP_IGNORE;
- return Log_event::do_shall_skip(rli);
+ return Log_event::do_shall_skip(rgi);
}
#endif
@@ -4263,11 +4266,13 @@ int Query_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
-Query_log_event::do_shall_skip(Relay_log_info *rli)
+Query_log_event::do_shall_skip(rpl_group_info *rgi)
{
+ Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Query_log_event::do_shall_skip");
DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len));
DBUG_ASSERT(query && q_len > 0);
+ DBUG_ASSERT(thd == rgi->thd);
/*
An event skipped due to @@skip_replication must not be counted towards the
@@ -4279,19 +4284,19 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
if (rli->slave_skip_counter > 0)
{
- if (strcmp("BEGIN", query) == 0)
+ if (is_begin())
{
thd->variables.option_bits|= OPTION_BEGIN;
- DBUG_RETURN(Log_event::continue_group(rli));
+ DBUG_RETURN(Log_event::continue_group(rgi));
}
- if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
+ if (is_commit() || is_rollback())
{
thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
}
- DBUG_RETURN(Log_event::do_shall_skip(rli));
+ DBUG_RETURN(Log_event::do_shall_skip(rgi));
}
@@ -4465,7 +4470,7 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
{
DBUG_ENTER("Start_log_event_v3::do_apply_event");
int error= 0;
- Relay_log_info const *rli= rgi->rli;
+ Relay_log_info *rli= rgi->rli;
switch (binlog_version)
{
@@ -4479,24 +4484,14 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
*/
if (created)
{
- error= close_temporary_tables(thd);
+ rli->close_temporary_tables();
+
/*
The following is only false if we get here with a BINLOG statement
*/
if (rli->mi)
cleanup_load_tmpdir(&rli->mi->cmp_connection_name);
}
- else
- {
- /*
- Set all temporary tables thread references to the current thread
- as they may point to the "old" SQL slave thread in case of its
- restart.
- */
- TABLE *table;
- for (table= thd->temporary_tables; table; table= table->next)
- table->in_use= thd;
- }
break;
/*
@@ -4511,7 +4506,7 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
Can distinguish, based on the value of 'created': this event was
generated at master startup.
*/
- error= close_temporary_tables(thd);
+ rli->close_temporary_tables();
}
/*
Otherwise, can't distinguish a Start_log_event generated at
@@ -4895,7 +4890,7 @@ int Format_description_log_event::do_update_pos(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
-Format_description_log_event::do_shall_skip(Relay_log_info *rli)
+Format_description_log_event::do_shall_skip(rpl_group_info *rgi)
{
return Log_event::EVENT_SKIP_NOT;
}
@@ -5970,8 +5965,8 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
flush_relay_log_info(rli);
/*
- Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of
- a master's downgrade from 5.0 to 4.0.
+ Reset thd->variables.option_bits and sql_mode etc, because this could
+ be the signal of a master's downgrade from 5.0 to 4.0.
However, no need to reset description_event_for_exec: indeed, if the next
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
master is 4.0 then the events are in the slave's format (conversion).
@@ -5991,9 +5986,9 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
-Rotate_log_event::do_shall_skip(Relay_log_info *rli)
+Rotate_log_event::do_shall_skip(rpl_group_info *rgi)
{
- enum_skip_reason reason= Log_event::do_shall_skip(rli);
+ enum_skip_reason reason= Log_event::do_shall_skip(rgi);
switch (reason) {
case Log_event::EVENT_SKIP_NOT:
@@ -6302,8 +6297,9 @@ Gtid_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
-Gtid_log_event::do_shall_skip(Relay_log_info *rli)
+Gtid_log_event::do_shall_skip(rpl_group_info *rgi)
{
+ Relay_log_info *rli= rgi->rli;
/*
An event skipped due to @@skip_replication must not be counted towards the
number of events to be skipped due to @@sql_slave_skip_counter.
@@ -6315,10 +6311,13 @@ Gtid_log_event::do_shall_skip(Relay_log_info *rli)
if (rli->slave_skip_counter > 0)
{
if (!(flags2 & FL_STANDALONE))
+ {
thd->variables.option_bits|= OPTION_BEGIN;
- return Log_event::continue_group(rli);
+ DBUG_ASSERT(rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION));
+ }
+ return Log_event::continue_group(rgi);
}
- return Log_event::do_shall_skip(rli);
+ return Log_event::do_shall_skip(rgi);
}
@@ -6707,13 +6706,6 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
int Intvar_log_event::do_apply_event(rpl_group_info *rgi)
{
- Relay_log_info *rli= rgi->rli;
- /*
- We are now in a statement until the associated query log event has
- been processed.
- */
- rli->set_flag(Relay_log_info::IN_STMT);
-
if (rgi->deferred_events_collecting)
return rgi->deferred_events->add(this);
@@ -6738,7 +6730,7 @@ int Intvar_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
-Intvar_log_event::do_shall_skip(Relay_log_info *rli)
+Intvar_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
It is a common error to set the slave skip counter to 1 instead of
@@ -6748,7 +6740,7 @@ Intvar_log_event::do_shall_skip(Relay_log_info *rli)
that we do not change the value of the slave skip counter since it
will be decreased by the following insert event.
*/
- return continue_group(rli);
+ return continue_group(rgi);
}
#endif
@@ -6818,13 +6810,6 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Rand_log_event::do_apply_event(rpl_group_info *rgi)
{
- Relay_log_info const *rli= rgi->rli;
- /*
- We are now in a statement until the associated query log event has
- been processed.
- */
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
-
if (rgi->deferred_events_collecting)
return rgi->deferred_events->add(this);
@@ -6842,7 +6827,7 @@ int Rand_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
-Rand_log_event::do_shall_skip(Relay_log_info *rli)
+Rand_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
It is a common error to set the slave skip counter to 1 instead of
@@ -6852,7 +6837,7 @@ Rand_log_event::do_shall_skip(Relay_log_info *rli)
that we do not change the value of the slave skip counter since it
will be decreased by the following insert event.
*/
- return continue_group(rli);
+ return continue_group(rgi);
}
/**
@@ -6998,14 +6983,16 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
-Xid_log_event::do_shall_skip(Relay_log_info *rli)
+Xid_log_event::do_shall_skip(rpl_group_info *rgi)
{
DBUG_ENTER("Xid_log_event::do_shall_skip");
- if (rli->slave_skip_counter > 0) {
+ if (rgi->rli->slave_skip_counter > 0)
+ {
+ DBUG_ASSERT(!rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION));
thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
- DBUG_RETURN(Log_event::do_shall_skip(rli));
+ DBUG_RETURN(Log_event::do_shall_skip(rgi));
}
#endif /* !MYSQL_CLIENT */
@@ -7418,7 +7405,6 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
{
Item *it= 0;
CHARSET_INFO *charset;
- Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("User_var_log_event::do_apply_event");
if (rgi->deferred_events_collecting)
@@ -7435,12 +7421,6 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
double real_val;
longlong int_val;
- /*
- We are now in a statement until the associated query log event has
- been processed.
- */
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
-
if (is_null)
{
it= new Item_null();
@@ -7511,7 +7491,7 @@ int User_var_log_event::do_update_pos(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
-User_var_log_event::do_shall_skip(Relay_log_info *rli)
+User_var_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
It is a common error to set the slave skip counter to 1 instead
@@ -7521,7 +7501,7 @@ User_var_log_event::do_shall_skip(Relay_log_info *rli)
that we do not change the value of the slave skip counter since it
will be decreased by the following insert event.
*/
- return continue_group(rli);
+ return continue_group(rgi);
}
#endif /* !MYSQL_CLIENT */
@@ -7724,9 +7704,11 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
Start_log_event_v3::do_apply_event(), not here. Because if we come
here, the master was sane.
*/
+
int Stop_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
+ DBUG_ENTER("Stop_log_event::do_update_pos");
/*
We do not want to update master_log pos because we get a rotate event
before stop, so by now group_master_log_name is set to the next log.
@@ -7734,7 +7716,7 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi)
could give false triggers in MASTER_POS_WAIT() that we have reached
the target position when in fact we have not.
*/
- if (thd->variables.option_bits & OPTION_BEGIN)
+ if (rli->get_flag(Relay_log_info::IN_TRANSACTION))
rli->inc_event_relay_log_pos();
else
{
@@ -7742,7 +7724,7 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi)
rli->inc_group_relay_log_pos(0);
flush_relay_log_info(rli);
}
- return 0;
+ DBUG_RETURN(0);
}
#endif /* !MYSQL_CLIENT */
@@ -8514,13 +8496,13 @@ int Begin_load_query_log_event::get_create_or_append() const
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
Log_event::enum_skip_reason
-Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
+Begin_load_query_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1, then we should not start executing
on the next event.
*/
- return continue_group(rli);
+ return continue_group(rgi);
}
#endif
@@ -9272,17 +9254,6 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
thd->set_time(when, when_sec_part);
- /*
- Now we are in a statement and will stay in a statement until we
- see a STMT_END_F.
-
- We set this flag here, before actually applying any rows, in
- case the SQL thread is stopped and we need to detect that we're
- inside a statement and halting abruptly might cause problems
- when restarting.
- */
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
-
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
set_flags(COMPLETE_ROWS_F);
@@ -9442,17 +9413,17 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
-Rows_log_event::do_shall_skip(Relay_log_info *rli)
+Rows_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1 and this event does not end a
statement, then we should not start executing on the next event.
Otherwise, we defer the decision to the normal skipping logic.
*/
- if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
+ if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
return Log_event::EVENT_SKIP_IGNORE;
else
- return Log_event::do_shall_skip(rli);
+ return Log_event::do_shall_skip(rgi);
}
/**
@@ -9469,6 +9440,8 @@ Rows_log_event::do_shall_skip(Relay_log_info *rli)
static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD * thd)
{
int error;
+ DBUG_ENTER("rows_event_stmt_cleanup");
+
{
/*
This is the end of a statement or transaction, so close (and
@@ -9520,9 +9493,16 @@ static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD * thd)
*/
thd->reset_current_stmt_binlog_format_row();
+ /*
+ Reset modified_non_trans_table that we have set in
+ rows_log_event::do_apply_event()
+ */
+ if (!thd->in_multi_stmt_transaction_mode())
+ thd->transaction.all.modified_non_trans_table= 0;
+
rgi->cleanup_context(thd, 0);
}
- return error;
+ DBUG_RETURN(error);
}
/**
@@ -9795,9 +9775,9 @@ int Annotate_rows_log_event::do_update_pos(rpl_group_info *rgi)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
Log_event::enum_skip_reason
-Annotate_rows_log_event::do_shall_skip(Relay_log_info *rli)
+Annotate_rows_log_event::do_shall_skip(rpl_group_info *rgi)
{
- return continue_group(rli);
+ return continue_group(rgi);
}
#endif
@@ -10265,7 +10245,7 @@ check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list)
enum_tbl_map_status res= OK_TO_PROCESS;
Relay_log_info *rli= rgi->rli;
- if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
+ if (rgi->thd->slave_thread /* filtering is for slave only */ &&
(!rli->mi->rpl_filter->db_ok(table_list->db) ||
(rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
res= FILTERED_OUT;
@@ -10316,7 +10296,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
/* call from mysql_client_binlog_statement() will not set rli->mi */
- filter= rli->sql_thd->slave_thread ? rli->mi->rpl_filter : global_rpl_filter;
+ filter= rgi->thd->slave_thread ? rli->mi->rpl_filter : global_rpl_filter;
strmov(db_mem, filter->get_rewrite_db(m_dbnam, &dummy_len));
strmov(tname_mem, m_tblnam);
@@ -10404,13 +10384,13 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
-Table_map_log_event::do_shall_skip(Relay_log_info *rli)
+Table_map_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1, then we should not start executing
on the next event.
*/
- return continue_group(rli);
+ return continue_group(rgi);
}
int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
diff --git a/sql/log_event.h b/sql/log_event.h
index d689ebcd582..6fbd69453b4 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -1342,9 +1342,9 @@ public:
@see do_shall_skip
*/
- enum_skip_reason shall_skip(Relay_log_info *rli)
+ enum_skip_reason shall_skip(rpl_group_info *rgi)
{
- return do_shall_skip(rli);
+ return do_shall_skip(rgi);
}
@@ -1352,6 +1352,7 @@ public:
Check if an event is non-final part of a stand-alone event group,
such as Intvar_log_event (such events should be processed as part
of the following event group, not individually).
+ See also is_part_of_group()
*/
static bool is_part_of_group(enum Log_event_type ev_type)
{
@@ -1375,6 +1376,11 @@ public:
return false;
}
}
+ /*
+ Same as above, but works on the object. In addition this is true for all
+ rows event except the last one.
+ */
+ virtual bool is_part_of_group() { return 0; }
static bool is_group_event(enum Log_event_type ev_type)
{
@@ -1408,14 +1414,14 @@ protected:
A typical usage is:
@code
- enum_skip_reason do_shall_skip(Relay_log_info *rli) {
- return continue_group(rli);
+ enum_skip_reason do_shall_skip(rpl_group_info *rgi) {
+ return continue_group(rgi);
}
@endcode
@return Skip reason
*/
- enum_skip_reason continue_group(Relay_log_info *rli);
+ enum_skip_reason continue_group(rpl_group_info *rgi);
/**
Primitive to apply an event to the database.
@@ -1493,7 +1499,7 @@ protected:
The event shall be skipped because the slave skip counter was
non-zero. The caller shall decrease the counter by one.
*/
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -1985,7 +1991,7 @@ public:
public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
@@ -2017,6 +2023,9 @@ public: /* !!! Public in this patch to allow old usage */
!strncasecmp(query, "SAVEPOINT", 9) ||
!strncasecmp(query, "ROLLBACK", 8);
}
+ bool is_begin() { return !strcmp(query, "BEGIN"); }
+ bool is_commit() { return !strcmp(query, "COMMIT"); }
+ bool is_rollback() { return !strcmp(query, "ROLLBACK"); }
};
@@ -2501,7 +2510,7 @@ public:
protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info*)
+ virtual enum_skip_reason do_shall_skip(rpl_group_info*)
{
/*
Events from ourself should be skipped, but they should not
@@ -2598,7 +2607,7 @@ protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -2672,12 +2681,13 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
+ bool is_part_of_group() { return 1; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -2751,12 +2761,13 @@ class Rand_log_event: public Log_event
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
+ bool is_part_of_group() { return 1; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -2804,7 +2815,7 @@ class Xid_log_event: public Log_event
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
- enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -2867,12 +2878,13 @@ public:
void set_deferred() { deferred= true; }
#endif
bool is_valid() const { return name != 0; }
+ bool is_part_of_group() { return 1; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -2906,7 +2918,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli)
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi)
{
/*
Events from ourself should be skipped, but they should not
@@ -3008,7 +3020,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -3121,7 +3133,7 @@ public:
void pack_info(THD *thd, Protocol *protocol);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
#else
void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
@@ -3497,7 +3509,7 @@ public:
Log_event_type get_type_code() { return BEGIN_LOAD_QUERY_EVENT; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -3619,6 +3631,7 @@ public:
virtual int get_data_size();
virtual Log_event_type get_type_code();
virtual bool is_valid() const;
+ virtual bool is_part_of_group() { return 1; }
#ifndef MYSQL_CLIENT
virtual bool write_data_header(IO_CACHE*);
@@ -3637,7 +3650,7 @@ public:
private:
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info*);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info*);
#endif
private:
@@ -4030,6 +4043,7 @@ public:
virtual Log_event_type get_type_code() { return TABLE_MAP_EVENT; }
virtual bool is_valid() const { return m_memory != NULL; /* we check malloc */ }
+ virtual bool is_part_of_group() { return 1; }
virtual int get_data_size() { return (uint) m_data_size; }
#ifdef MYSQL_SERVER
@@ -4052,7 +4066,7 @@ private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
#ifdef MYSQL_SERVER
@@ -4195,6 +4209,7 @@ public:
{
return m_rows_buf && m_cols.bitmap;
}
+ bool is_part_of_group() { return get_flags(STMT_END_F) != 0; }
uint m_row_count; /* The number of rows added to the event */
@@ -4280,7 +4295,7 @@ private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index 58f299dabe7..b4f28abcf2b 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -205,17 +205,6 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
- /*
- Now we are in a statement and will stay in a statement until we
- see a STMT_END_F.
-
- We set this flag here, before actually applying any rows, in
- case the SQL thread is stopped and we need to detect that we're
- inside a statement and halting abruptly might cause problems
- when restarting.
- */
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
-
error= do_before_row_operations(table);
while (error == 0 && row_start < ev->m_rows_end)
{
@@ -1613,17 +1602,6 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
- /*
- Now we are in a statement and will stay in a statement until we
- see a STMT_END_F.
-
- We set this flag here, before actually applying any rows, in
- case the SQL thread is stopped and we need to detect that we're
- inside a statement and halting abruptly might cause problems
- when restarting.
- */
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
-
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
set_flags(COMPLETE_ROWS_F);
@@ -1820,17 +1798,17 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
Log_event::enum_skip_reason
-Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
+Old_rows_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1 and this event does not end a
statement, then we should not start executing on the next event.
Otherwise, we defer the decision to the normal skipping logic.
*/
- if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
+ if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
return Log_event::EVENT_SKIP_IGNORE;
else
- return Log_event::do_shall_skip(rli);
+ return Log_event::do_shall_skip(rgi);
}
int
diff --git a/sql/log_event_old.h b/sql/log_event_old.h
index 01b80439fa1..e5ed25f57ac 100644
--- a/sql/log_event_old.h
+++ b/sql/log_event_old.h
@@ -145,6 +145,7 @@ public:
{
return m_rows_buf && m_cols.bitmap;
}
+ bool is_part_of_group() { return 1; }
uint m_row_count; /* The number of rows added to the event */
@@ -216,7 +217,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
- virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index a7fa78838a9..9f1d9e48b1c 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -467,11 +467,12 @@ uint lower_case_table_names;
ulong tc_heuristic_recover= 0;
int32 thread_count;
int32 thread_running;
+int32 slave_open_temp_tables;
ulong thread_created;
ulong back_log, connect_timeout, concurrency, server_id;
ulong table_cache_size, table_def_size;
ulong what_to_log;
-ulong slow_launch_time, slave_open_temp_tables;
+ulong slow_launch_time;
ulong open_files_limit, max_binlog_size;
ulong slave_trans_retries;
uint slave_net_timeout;
@@ -767,7 +768,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_master_info_sleep_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
- key_relay_log_info_sleep_lock,
+ key_rpl_group_info_sleep_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOG_INFO_lock,
key_LOCK_thread_count, key_LOCK_thread_cache,
@@ -839,7 +840,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_relay_log_info_data_lock, "Relay_log_info::data_lock", 0},
{ &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
{ &key_relay_log_info_run_lock, "Relay_log_info::run_lock", 0},
- { &key_relay_log_info_sleep_lock, "Relay_log_info::sleep_lock", 0},
+ { &key_rpl_group_info_sleep_lock, "Rpl_group_info::sleep_lock", 0},
{ &key_structure_guard_mutex, "Query_cache::structure_guard_mutex", 0},
{ &key_TABLE_SHARE_LOCK_ha_data, "TABLE_SHARE::LOCK_ha_data", 0},
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
@@ -888,7 +889,7 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_master_info_sleep_cond,
key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
- key_relay_log_info_sleep_cond,
+ key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
key_BINLOG_COND_queue_busy;
@@ -934,7 +935,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_relay_log_info_log_space_cond, "Relay_log_info::log_space_cond", 0},
{ &key_relay_log_info_start_cond, "Relay_log_info::start_cond", 0},
{ &key_relay_log_info_stop_cond, "Relay_log_info::stop_cond", 0},
- { &key_relay_log_info_sleep_cond, "Relay_log_info::sleep_cond", 0},
+ { &key_rpl_group_info_sleep_cond, "Rpl_group_info::sleep_cond", 0},
{ &key_TABLE_SHARE_cond, "TABLE_SHARE::cond", 0},
{ &key_user_level_lock_cond, "User_level_lock::cond", 0},
{ &key_COND_thread_count, "COND_thread_count", PSI_FLAG_GLOBAL},
@@ -7285,7 +7286,7 @@ SHOW_VAR status_vars[]= {
{"Select_range", (char*) offsetof(STATUS_VAR, select_range_count), SHOW_LONG_STATUS},
{"Select_range_check", (char*) offsetof(STATUS_VAR, select_range_check_count), SHOW_LONG_STATUS},
{"Select_scan", (char*) offsetof(STATUS_VAR, select_scan_count), SHOW_LONG_STATUS},
- {"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG},
+ {"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_INT},
#ifdef HAVE_REPLICATION
{"Slave_retried_transactions",(char*)&slave_retried_transactions, SHOW_LONG},
{"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_SIMPLE_FUNC},
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 345e9fa74c9..0bd3687f4fb 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -153,7 +153,7 @@ extern ulong delayed_insert_timeout;
extern ulong delayed_insert_limit, delayed_queue_size;
extern ulong delayed_insert_threads, delayed_insert_writes;
extern ulong delayed_rows_in_use,delayed_insert_errors;
-extern ulong slave_open_temp_tables;
+extern int32 slave_open_temp_tables;
extern ulonglong query_cache_size;
extern ulong query_cache_min_res_unit;
extern ulong slow_launch_threads, slow_launch_time;
@@ -246,7 +246,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_master_info_sleep_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
- key_relay_log_info_sleep_lock,
+ key_rpl_group_info_sleep_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
@@ -278,7 +278,7 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_master_info_sleep_cond,
key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
- key_relay_log_info_sleep_cond,
+ key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index a1b14ad3255..1e393eab502 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -65,6 +65,7 @@ int
rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
uint64 sub_id;
+ DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
/*
Update the GTID position, if we have it and did not already update
@@ -74,10 +75,10 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
rgi->gtid_sub_id= 0;
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
- return 1;
+ DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid);
}
- return 0;
+ DBUG_RETURN(0);
}
@@ -310,6 +311,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
+ DBUG_ENTER("record_gtid");
if (unlikely(!loaded))
{
@@ -320,7 +322,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
We already complained loudly about this, but we can try to continue
until the DBA fixes it.
*/
- return 0;
+ DBUG_RETURN(0);
}
if (!in_statement)
@@ -329,7 +331,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{
my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
- return 1;
+ DBUG_RETURN(1);
} );
thd->lex->reset_n_backup_query_tables_list(&lex_backup);
@@ -347,8 +349,11 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->no_replicate= 1;
if (!in_transaction)
+ {
+ DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
thd->variables.option_bits&=
~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN);
+ }
bitmap_set_all(table->write_set);
@@ -457,7 +462,7 @@ end:
}
thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option;
- return err;
+ DBUG_RETURN(err);
}
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index e80512a3580..c10a035c599 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -43,11 +43,6 @@
slave rolls back the transaction; parallel execution needs to be able
to deal with this wrt. commit_orderer and such.
- - Relay_log_info::is_in_group(). This needs to be handled correctly in all
- callers. I think it needs to be split into two, one version in
- Relay_log_info to be used from next_event() in slave.cc, one to be used in
- per-transaction stuff.
-
- We should fail if we connect to the master with opt_slave_parallel_threads
greater than zero and master does not support GTID. Just to avoid a bunch
of potential problems, we won't be able to do any parallel replication
@@ -71,6 +66,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
+ qev->ev->thd= thd;
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
/* ToDo: error handling. */
@@ -234,8 +230,8 @@ handle_rpl_parallel_thread(void *arg)
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
- (!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) ||
- !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))));
+ (((Query_log_event *)events->ev)->is_commit() ||
+ ((Query_log_event *)events->ev)->is_rollback())));
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
my_free(events);
@@ -612,6 +608,11 @@ rpl_parallel::wait_for_done()
}
+/*
+ do_event() is executed by the sql_driver_thd thread.
+ It's main purpose is to find a thread that can exectue the query.
+*/
+
bool
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
{
@@ -718,9 +719,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
if (!cur_thread)
{
/*
- Nothing else is currently running in this domain. We can spawn a new
- thread to do this event group in parallel with anything else that might
- be running in other domains.
+ Nothing else is currently running in this domain. We can
+ spawn a new thread to do this event group in parallel with
+ anything else that might be running in other domains.
*/
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
/* get_thread() returns with the LOCK_rpl_thread locked. */
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index c4b898f74e3..ae2b7558285 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -56,7 +56,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
#endif
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), slave_skip_counter(0),
- abort_pos_wait(0), slave_run_id(0), sql_thd(0),
+ abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
last_event_start_time(0), m_flags(0),
@@ -85,12 +85,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
&data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_log_space_lock,
&log_space_lock, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
- mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@@ -103,12 +101,10 @@ Relay_log_info::~Relay_log_info()
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
- mysql_mutex_destroy(&sleep_lock);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
- mysql_cond_destroy(&sleep_cond);
relay_log.cleanup();
DBUG_VOID_RETURN;
}
@@ -523,6 +519,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
}
rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
/*
Test to see if the previous run was with the skip of purging
@@ -935,6 +933,9 @@ void Relay_log_info::close_temporary_tables()
for (table=save_temporary_tables ; table ; table=next)
{
next=table->next;
+
+ /* Reset in_use as the table may have been created by another thd */
+ table->in_use=0;
/*
Don't ask for disk deletion. For now, anyway they will be deleted when
slave restarts, but it is a better intention to not delete them.
@@ -1094,9 +1095,9 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
!replicate_same_server_id)
DBUG_RETURN(FALSE);
log_name= group_master_log_name;
- log_pos= (!ev)? group_master_log_pos :
- ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ?
- group_master_log_pos : ev->log_pos - ev->data_written);
+ log_pos= ((!ev)? group_master_log_pos :
+ (get_flag(IN_TRANSACTION) || !ev->log_pos) ?
+ group_master_log_pos : ev->log_pos - ev->data_written);
}
else
{ /* until_condition == UNTIL_RELAY_POS */
@@ -1195,7 +1196,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
#endif
- clear_flag(IN_STMT);
+ DBUG_ENTER("Relay_log_info::stmt_done");
DBUG_ASSERT(rgi->rli == this);
/*
@@ -1204,6 +1205,9 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
BEGIN/COMMIT, not with SET AUTOCOMMIT= .
+ We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
+ is also used for single row transactions.
+
CAUTION: opt_using_transactions means innodb || bdb ; suppose the
master supports InnoDB and BDB, but the slave supports only BDB,
problems will arise: - suppose an InnoDB table is created on the
@@ -1221,7 +1225,8 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
- if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
+ if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
+ opt_using_transactions)
inc_event_relay_log_pos();
else
{
@@ -1255,6 +1260,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)))
last_master_timestamp= event_creation_time;
}
+ DBUG_VOID_RETURN;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@@ -1417,12 +1423,17 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_)
tables_to_lock_count(0)
{
bzero(&current_gtid, sizeof(current_gtid));
+ mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
}
rpl_group_info::~rpl_group_info()
{
free_annotate_event();
+ mysql_mutex_destroy(&sleep_lock);
+ mysql_cond_destroy(&sleep_cond);
}
@@ -1492,7 +1503,8 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
-
+ DBUG_PRINT("enter", ("error: %d", (int) error));
+
DBUG_ASSERT(this->thd == thd);
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
@@ -1514,9 +1526,20 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables();
slave_close_thread_tables(thd);
if (error)
+ {
thd->mdl_context.release_transactional_locks();
- /* ToDo: This must clear the flag in rgi, not rli. */
- rli->clear_flag(Relay_log_info::IN_STMT);
+
+ if (thd == rli->sql_driver_thd)
+ {
+ /*
+ Reset flags. This is needed to handle incident events and errors in
+ the relay log noticed by the sql driver thread.
+ */
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
+ }
+ }
+
/*
Cleanup for the flags that have been set at do_apply_event.
*/
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 10181cc6fab..9e96fb8e72c 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -59,14 +59,14 @@ class Relay_log_info : public Slave_reporting_capability
{
public:
/**
- Flags for the state of the replication.
- */
+ Flags for the state of reading the relay log. Note that these are
+ bit masks.
+ */
enum enum_state_flag {
- /** The replication thread is inside a statement */
- IN_STMT,
-
- /** Flag counter. Should always be last */
- STATE_FLAGS_COUNT
+ /** We are inside a group of events forming a statement */
+ IN_STMT=1,
+ /** We have inside a transaction */
+ IN_TRANSACTION=2
};
/*
@@ -131,9 +131,14 @@ public:
IO_CACHE info_file;
/*
- When we restart slave thread we need to have access to the previously
- created temporary tables. Modified only on init/end and by the SQL
- thread, read only by SQL thread.
+ List of temporary tables used by this connection.
+ This is updated when a temporary table is created or dropped by
+ a replication thread.
+
+ Not reset when replication ends, to allow one to access the tables
+ when replication restarts.
+
+ Protected by data_lock.
*/
TABLE *save_temporary_tables;
@@ -141,13 +146,13 @@ public:
standard lock acquisition order to avoid deadlocks:
run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
*/
- mysql_mutex_t data_lock, run_lock, sleep_lock;
+ mysql_mutex_t data_lock, run_lock;
/*
start_cond is broadcast when SQL thread is started
stop_cond - when stopped
data_cond - when data protected by data_lock changes
*/
- mysql_cond_t start_cond, stop_cond, data_cond, sleep_cond;
+ mysql_cond_t start_cond, stop_cond, data_cond;
/* parent Master_info structure */
Master_info *mi;
@@ -164,8 +169,8 @@ public:
- an autocommiting query + its associated events (INSERT_ID,
TIMESTAMP...)
We need these rli coordinates :
- - relay log name and position of the beginning of the group we currently are
- executing. Needed to know where we have to restart when replication has
+ - relay log name and position of the beginning of the group we currently
+ are executing. Needed to know where we have to restart when replication has
stopped in the middle of a group (which has been rolled back by the slave).
- relay log name and position just after the event we have just
executed. This event is part of the current group.
@@ -239,7 +244,13 @@ public:
ulong max_relay_log_size;
mysql_mutex_t log_space_lock;
mysql_cond_t log_space_cond;
- THD * sql_thd;
+ /*
+ THD for the main sql thread, the one that starts threads to process
+ slave requests. If there is only one thread, then this THD is also
+ used for SQL processing.
+ A kill sent to this THD will kill the replication.
+ */
+ THD *sql_driver_thd;
#ifndef DBUG_OFF
int events_till_abort;
#endif
@@ -399,6 +410,25 @@ public:
time_t event_creation_time, THD *thd,
rpl_group_info *rgi);
+ /**
+ Is the replication inside a group?
+
+ The reader of the relay log is inside a group if either:
+ - The IN_TRANSACTION flag is set, meaning we're inside a transaction
+ - The IN_STMT flag is set, meaning we have read at least one row from
+ a multi-event entry.
+
+ This flag reflects the state of the log 'just now', ie after the last
+ read event would be executed.
+ This allow us to test if we can stop replication before reading
+ the next entry.
+
+ @retval true Replication thread is currently inside a group
+ @retval false Replication thread is currently not inside a group
+ */
+ bool is_in_group() const {
+ return (m_flags & (IN_STMT | IN_TRANSACTION));
+ }
/**
Set the value of a replication state flag.
@@ -407,7 +437,7 @@ public:
*/
void set_flag(enum_state_flag flag)
{
- m_flags |= (1UL << flag);
+ m_flags|= flag;
}
/**
@@ -419,7 +449,7 @@ public:
*/
bool get_flag(enum_state_flag flag)
{
- return m_flags & (1UL << flag);
+ return m_flags & flag;
}
/**
@@ -429,22 +459,7 @@ public:
*/
void clear_flag(enum_state_flag flag)
{
- m_flags &= ~(1UL << flag);
- }
-
- /**
- Is the replication inside a group?
-
- Replication is inside a group if either:
- - The OPTION_BEGIN flag is set, meaning we're inside a transaction
- - The RLI_IN_STMT flag is set, meaning we're inside a statement
-
- @retval true Replication thread is currently inside a group
- @retval false Replication thread is currently not inside a group
- */
- bool is_in_group() const {
- return (sql_thd->variables.option_bits & OPTION_BEGIN) ||
- (m_flags & (1UL << IN_STMT));
+ m_flags&= ~flag;
}
time_t get_row_stmt_start_timestamp()
@@ -482,7 +497,12 @@ public:
private:
- /* ToDo: This must be moved to rpl_group_info. */
+ /*
+ Holds the state of the data in the relay log.
+ We need this to ensure that we are not in the middle of a
+ statement or inside BEGIN ... COMMIT when should rotate the
+ relay log.
+ */
uint32 m_flags;
/*
@@ -503,8 +523,11 @@ private:
together.
In parallel replication, there will be one rpl_group_info object for
- each running thd. All rpl_group_info will share the same Relay_log_info.
+ each running sql thread, each having their own thd.
+
+ All rpl_group_info will share the same Relay_log_info.
*/
+
struct rpl_group_info
{
Relay_log_info *rli;
@@ -566,6 +589,8 @@ struct rpl_group_info
RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
uint tables_to_lock_count; /* RBR: Count of tables to lock */
table_mapping m_table_map; /* RBR: Mapping table-id to table */
+ mysql_mutex_t sleep_lock;
+ mysql_cond_t sleep_cond;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
diff --git a/sql/slave.cc b/sql/slave.cc
index cd4e4254dbc..2504f723a78 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -146,8 +146,8 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(Relay_log_info* rli);
-static inline bool io_slave_killed(THD* thd,Master_info* mi);
-static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
+static bool io_slave_killed(Master_info* mi);
+static bool sql_slave_killed(rpl_group_info *rgi);
static int init_slave_thread(THD* thd, Master_info *mi,
SLAVE_THD_TYPE thd_type);
static void print_slave_skip_errors(void);
@@ -156,14 +156,14 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
bool suppress_warnings);
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
bool reconnect, bool suppress_warnings);
-static Log_event* next_event(Relay_log_info* rli);
+static Log_event* next_event(rpl_group_info* rgi);
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
static int terminate_slave_thread(THD *thd,
mysql_mutex_t *term_lock,
mysql_cond_t *term_cond,
volatile uint *slave_running,
bool skip_lock);
-static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
+static bool check_io_slave_killed(Master_info *mi, const char *info);
static bool send_show_master_info_header(THD *thd, bool full,
size_t gtid_pos_length);
static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
@@ -570,13 +570,6 @@ void init_slave_skip_errors(const char* arg)
DBUG_VOID_RETURN;
}
-static void set_thd_in_use_temporary_tables(Relay_log_info *rli)
-{
- TABLE *table;
-
- for (table= rli->save_temporary_tables ; table ; table= table->next)
- table->in_use= rli->sql_thd;
-}
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
{
@@ -592,7 +585,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
{
DBUG_PRINT("info",("Terminating SQL thread"));
mi->rli.abort_slave=1;
- if ((error=terminate_slave_thread(mi->rli.sql_thd, sql_lock,
+ if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock,
&mi->rli.stop_cond,
&mi->rli.slave_running,
skip_lock)) &&
@@ -957,13 +950,12 @@ void end_slave()
DBUG_VOID_RETURN;
}
-static bool io_slave_killed(THD* thd, Master_info* mi)
+static bool io_slave_killed(Master_info* mi)
{
DBUG_ENTER("io_slave_killed");
- DBUG_ASSERT(mi->io_thd == thd);
DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
- DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
+ DBUG_RETURN(mi->abort_slave || abort_loop || mi->io_thd->killed);
}
/**
@@ -979,26 +971,36 @@ static bool io_slave_killed(THD* thd, Master_info* mi)
@return TRUE the killed status is recognized, FALSE a possible killed
status is deferred.
*/
-static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
+static bool sql_slave_killed(rpl_group_info *rgi)
{
bool ret= FALSE;
+ Relay_log_info *rli= rgi->rli;
+ THD *thd= rgi->thd;
DBUG_ENTER("sql_slave_killed");
- DBUG_ASSERT(rli->sql_thd == thd);
+ DBUG_ASSERT(rli->sql_driver_thd == thd);
DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
- if (abort_loop || thd->killed || rli->abort_slave)
+ if (abort_loop || rli->sql_driver_thd->killed || rli->abort_slave)
{
/*
- The transaction should always be binlogged if OPTION_KEEP_LOG is set
- (it implies that something can not be rolled back). And such case
- should be regarded similarly as modifing a non-transactional table
- because retrying of the transaction will lead to an error or inconsistency
- as well.
- Example: OPTION_KEEP_LOG is set if a temporary table is created or dropped.
+ The transaction should always be binlogged if OPTION_KEEP_LOG is
+ set (it implies that something can not be rolled back). And such
+ case should be regarded similarly as modifing a
+ non-transactional table because retrying of the transaction will
+ lead to an error or inconsistency as well.
+
+ Example: OPTION_KEEP_LOG is set if a temporary table is created
+ or dropped.
+
+ Note that transaction.all.modified_non_trans_table may be 1
+ if last statement was a single row transaction without begin/end.
+ Testing this flag must always be done in connection with
+ rli->is_in_group().
*/
+
if ((thd->transaction.all.modified_non_trans_table ||
- (thd->variables.option_bits & OPTION_KEEP_LOG))
- && rli->is_in_group())
+ (thd->variables.option_bits & OPTION_KEEP_LOG)) &&
+ rli->is_in_group())
{
char msg_stopped[]=
"... Slave SQL Thread stopped with incomplete event group "
@@ -1008,20 +1010,28 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
"ignores duplicate key, key not found, and similar errors (see "
"documentation for details).";
+ DBUG_PRINT("info", ("modified_non_trans_table: %d OPTION_BEGIN: %d "
+ "is_in_group: %d",
+ thd->transaction.all.modified_non_trans_table,
+ test(thd->variables.option_bits & OPTION_BEGIN),
+ rli->is_in_group()));
+
if (rli->abort_slave)
{
- DBUG_PRINT("info", ("Request to stop slave SQL Thread received while "
- "applying a group that has non-transactional "
- "changes; waiting for completion of the group ... "));
+ DBUG_PRINT("info",
+ ("Request to stop slave SQL Thread received while "
+ "applying a group that has non-transactional "
+ "changes; waiting for completion of the group ... "));
/*
- Slave sql thread shutdown in face of unfinished group modified
- Non-trans table is handled via a timer. The slave may eventually
- give out to complete the current group and in that case there
- might be issues at consequent slave restart, see the error message.
- WL#2975 offers a robust solution requiring to store the last exectuted
- event's coordinates along with the group's coordianates
- instead of waiting with @c last_event_start_time the timer.
+ Slave sql thread shutdown in face of unfinished group
+ modified Non-trans table is handled via a timer. The slave
+ may eventually give out to complete the current group and in
+ that case there might be issues at consequent slave restart,
+ see the error message. WL#2975 offers a robust solution
+ requiring to store the last exectuted event's coordinates
+ along with the group's coordianates instead of waiting with
+ @c last_event_start_time the timer.
*/
if (rli->last_event_start_time == 0)
@@ -1049,7 +1059,8 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
else
{
ret= TRUE;
- rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
msg_stopped);
}
}
@@ -1461,7 +1472,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
mi->clock_diff_with_master=
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
}
- else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@@ -1526,7 +1537,7 @@ not always make sense; please check the manual before using it).";
}
else if (mysql_errno(mysql))
{
- if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@@ -1599,7 +1610,7 @@ be equal for the Statement-format replication to work";
goto err;
}
}
- else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@@ -1662,7 +1673,7 @@ be equal for the Statement-format replication to work";
goto err;
}
}
- else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(err_code= mysql_errno(mysql)))
{
@@ -1707,7 +1718,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
sprintf(query, query_format, llbuf);
if (mysql_real_query(mysql, query, strlen(query))
- && !check_io_slave_killed(mi->io_thd, mi, NULL))
+ && !check_io_slave_killed(mi, NULL))
{
errmsg= "The slave I/O thread stops because SET @master_heartbeat_period "
"on master failed.";
@@ -1742,7 +1753,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
rc= mysql_real_query(mysql, query, strlen(query));
if (rc != 0)
{
- if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
if (mysql_errno(mysql) == ER_UNKNOWN_SYSTEM_VARIABLE)
@@ -1788,7 +1799,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF ||
mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32);
}
- else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@@ -2052,7 +2063,7 @@ after_set_capability:
rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0],
strlen(master_row[0]), false, false);
}
- else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@@ -2118,7 +2129,7 @@ static bool wait_for_relay_log_space(Relay_log_info* rli)
"\
Waiting for the slave SQL thread to free enough relay log space");
while (rli->log_space_limit < rli->log_space_total &&
- !(slave_killed=io_slave_killed(thd,mi)) &&
+ !(slave_killed=io_slave_killed(mi)) &&
!rli->ignore_log_space_limit)
mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
@@ -2293,7 +2304,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
{
*suppress_warnings= TRUE; // Suppress reconnect warning
}
- else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
+ else if (!check_io_slave_killed(mi, NULL))
{
char buf[256];
my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql),
@@ -2463,8 +2474,15 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
&my_charset_bin);
mysql_mutex_lock(&mi->run_lock);
if (full)
- protocol->store(mi->rli.sql_thd ? mi->rli.sql_thd->proc_info : "",
+ {
+ /*
+ Show what the sql driver replication thread is doing
+ This is only meaningful if there is only one slave thread.
+ */
+ protocol->store(mi->rli.sql_driver_thd ?
+ mi->rli.sql_driver_thd->proc_info : "",
&my_charset_bin);
+ }
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
mysql_mutex_unlock(&mi->run_lock);
@@ -2797,8 +2815,8 @@ static int init_slave_thread(THD* thd, Master_info *mi,
@retval True if the thread has been killed, false otherwise.
*/
template <typename killed_func, typename rpl_info>
-static inline bool slave_sleep(THD *thd, time_t seconds,
- killed_func func, rpl_info info)
+static bool slave_sleep(THD *thd, time_t seconds,
+ killed_func func, rpl_info info)
{
bool ret;
@@ -2813,7 +2831,7 @@ static inline bool slave_sleep(THD *thd, time_t seconds,
mysql_mutex_lock(lock);
old_proc_info= thd->enter_cond(cond, lock, thd->proc_info);
- while (! (ret= func(thd, info)))
+ while (! (ret= func(info)))
{
int error= mysql_cond_timedwait(cond, lock, &abstime);
if (error == ETIMEDOUT || error == ETIME)
@@ -3024,7 +3042,6 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
{
int exec_res= 0;
Relay_log_info* rli= rgi->rli;
-
DBUG_ENTER("apply_event_and_update_pos");
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
@@ -3074,7 +3091,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
ev->thd = thd; // because up to this point, ev->thd == 0
- int reason= ev->shall_skip(rli);
+ int reason= ev->shall_skip(rgi);
if (reason == Log_event::EVENT_SKIP_COUNT)
{
DBUG_ASSERT(rli->slave_skip_counter > 0);
@@ -3098,9 +3115,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
// EVENT_SKIP_COUNT
"skipped because event skip counter was non-zero"
};
- DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
+ DBUG_PRINT("info", ("OPTION_BEGIN: %d IN_STMT: %d IN_TRANSACTION: %d",
test(thd->variables.option_bits & OPTION_BEGIN),
- rli->get_flag(Relay_log_info::IN_STMT)));
+ rli->get_flag(Relay_log_info::IN_STMT),
+ rli->get_flag(Relay_log_info::IN_TRANSACTION)));
DBUG_PRINT("skip_event", ("%s event was %s",
ev->get_type_str(), explain[reason]));
#endif
@@ -3150,6 +3168,80 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
/**
+ Keep the relay log transaction state up to date.
+
+ The state reflects how things are after the given event, that has just been
+ read from the relay log, is executed.
+
+ This is only needed to ensure we:
+ - Don't abort the sql driver thread in the middle of an event group.
+ - Don't rotate the io thread in the middle of a statement or transaction.
+ The mechanism is that the io thread, when it needs to rotate the relay
+ log, will wait until the sql driver has read all the cached events
+ and then continue reading events one by one from the master until
+ the sql threads signals that log doesn't have an active group anymore.
+
+ There are two possible cases. We keep them as 2 separate flags mainly
+ to make debugging easier.
+
+ - IN_STMT is set when we have read an event that should be used
+ together with the next event. This is for example setting a
+ variable that is used when executing the next statement.
+ - IN_TRANSACTION is set when we are inside a BEGIN...COMMIT group
+
+ To test the state one should use the is_in_group() function.
+*/
+
+inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
+{
+ Log_event_type typ= ev->get_type_code();
+
+ /* check if we are in a multi part event */
+ if (ev->is_part_of_group())
+ rli->set_flag(Relay_log_info::IN_STMT);
+ else if (Log_event::is_group_event(typ))
+ {
+ /*
+ If it was not a is_part_of_group() and not a group event (like
+ rotate) then we can reset the IN_STMT flag. We have the above
+ if only to allow us to have a rotate element anywhere.
+ */
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ }
+
+ /* Check for an event that starts or stops a transaction */
+ if (typ == QUERY_EVENT)
+ {
+ Query_log_event *qev= (Query_log_event*) ev;
+ /*
+ Trivial optimization to avoid the following somewhat expensive
+ checks.
+ */
+ if (qev->q_len <= sizeof("ROLLBACK"))
+ {
+ if (qev->is_begin())
+ rli->set_flag(Relay_log_info::IN_TRANSACTION);
+ if (qev->is_commit() || qev->is_rollback())
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
+ }
+ }
+ if (typ == XID_EVENT)
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
+ if (typ == GTID_EVENT &&
+ !(((Gtid_log_event*) ev)->flags2 & Gtid_log_event::FL_STANDALONE))
+ {
+ /* This GTID_EVENT will generate a BEGIN event */
+ rli->set_flag(Relay_log_info::IN_TRANSACTION);
+ }
+
+ DBUG_PRINT("info", ("event: %u IN_STMT: %d IN_TRANSACTION: %d",
+ (uint) typ,
+ rli->get_flag(Relay_log_info::IN_STMT),
+ rli->get_flag(Relay_log_info::IN_TRANSACTION)));
+}
+
+
+/**
Top-level function for executing the next event from the relay log.
This function reads the event from the relay log, executes it, and
@@ -3177,23 +3269,22 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
@retval 1 The event was not applied.
*/
+
static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
rpl_group_info *serial_rgi)
{
DBUG_ENTER("exec_relay_log_event");
/*
- We acquire this mutex since we need it for all operations except
- event execution. But we will release it in places where we will
- wait for something for example inside of next_event().
- */
+ We acquire this mutex since we need it for all operations except
+ event execution. But we will release it in places where we will
+ wait for something for example inside of next_event().
+ */
mysql_mutex_lock(&rli->data_lock);
- Log_event * ev = next_event(rli);
-
- DBUG_ASSERT(rli->sql_thd==thd);
+ Log_event * ev = next_event(serial_rgi);
- if (sql_slave_killed(thd,rli))
+ if (sql_slave_killed(serial_rgi))
{
mysql_mutex_unlock(&rli->data_lock);
delete ev;
@@ -3216,8 +3307,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
sql_print_information("Slave SQL thread stopped because it reached its"
" UNTIL position %s", llstr(rli->until_pos(), buf));
/*
- Setting abort_slave flag because we do not want additional message about
- error in query execution to be printed.
+ Setting abort_slave flag because we do not want additional
+ message about error in query execution to be printed.
*/
rli->abort_slave= 1;
mysql_mutex_unlock(&rli->data_lock);
@@ -3245,7 +3336,14 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
};);
}
- if (opt_slave_parallel_threads > 0)
+ update_state_of_relay_log(rli, ev);
+
+ /*
+ Execute queries in parallel, except if slave_skip_counter is set,
+ as it's is easier to skip queries in single threaded mode.
+ */
+
+ if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0)
DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev));
/*
@@ -3310,7 +3408,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
serial_rgi->cleanup_context(thd, 1);
/* chance for concurrent connection to get more locks */
slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
- sql_slave_killed, rli);
+ sql_slave_killed, serial_rgi);
mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS
rli->trans_retries++;
rli->retried_trans++;
@@ -3358,9 +3456,9 @@ on this slave.\
}
-static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
+static bool check_io_slave_killed(Master_info *mi, const char *info)
{
- if (io_slave_killed(thd, mi))
+ if (io_slave_killed(mi))
{
if (info && global_system_variables.log_warnings)
sql_print_information("%s", info);
@@ -3411,7 +3509,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
return 1; // Don't retry forever
slave_sleep(thd, mi->connect_retry, io_slave_killed, mi);
}
- if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING]))
+ if (check_io_slave_killed(mi, messages[SLAVE_RECON_MSG_KILLED_WAITING]))
return 1;
thd->proc_info = messages[SLAVE_RECON_MSG_AFTER];
if (!suppress_warnings)
@@ -3448,7 +3546,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
sql_print_information("%s", buf);
}
}
- if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi))
+ if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(mi))
{
if (global_system_variables.log_warnings)
sql_print_information("%s", messages[SLAVE_RECON_MSG_KILLED_AFTER]);
@@ -3631,11 +3729,14 @@ connected:
if (ret == 2)
{
- if (check_io_slave_killed(mi->io_thd, mi, "Slave I/O thread killed"
+ if (check_io_slave_killed(mi, "Slave I/O thread killed"
"while calling get_master_version_and_clock(...)"))
goto err;
suppress_warnings= FALSE;
- /* Try to reconnect because the error was caused by a transient network problem */
+ /*
+ Try to reconnect because the error was caused by a transient network
+ problem
+ */
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
reconnect_messages[SLAVE_RECON_ACT_REG]))
goto err;
@@ -3650,7 +3751,7 @@ connected:
thd_proc_info(thd, "Registering slave on master");
if (register_slave_on_master(mysql, mi, &suppress_warnings))
{
- if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
+ if (!check_io_slave_killed(mi, "Slave I/O thread killed "
"while registering slave on master"))
{
sql_print_error("Slave I/O thread couldn't register on master");
@@ -3675,13 +3776,13 @@ connected:
}
DBUG_PRINT("info",("Starting reading binary log from master"));
- while (!io_slave_killed(thd,mi))
+ while (!io_slave_killed(mi))
{
thd_proc_info(thd, "Requesting binlog dump");
if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
- if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
+ if (check_io_slave_killed(mi, "Slave I/O thread killed while \
requesting master dump") ||
try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
@@ -3701,7 +3802,7 @@ requesting master dump") ||
const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0);
- while (!io_slave_killed(thd,mi))
+ while (!io_slave_killed(mi))
{
ulong event_len;
/*
@@ -3712,7 +3813,7 @@ requesting master dump") ||
*/
thd_proc_info(thd, "Waiting for master to send event");
event_len= read_event(mysql, mi, &suppress_warnings);
- if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
+ if (check_io_slave_killed(mi, "Slave I/O thread killed while \
reading event"))
goto err;
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT",
@@ -3802,10 +3903,11 @@ Stopping slave I/O thread due to out-of-memory error from master");
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
the clean value is 0), then we are reading only one more event as we
should, and we'll block only at the next event. No big deal.
- - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
- the clean value is 1), then we are going into wait_for_relay_log_space()
- for no reason, but this function will do a clean read, notice the clean
- value and exit immediately.
+ - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just
+ after (so the clean value is 1), then we are going into
+ wait_for_relay_log_space() for no reason, but this function
+ will do a clean read, notice the clean value and exit
+ immediately.
*/
#ifndef DBUG_OFF
{
@@ -3866,7 +3968,7 @@ err:
mi->mysql=0;
}
write_ignored_events_info_to_relay_log(thd, mi);
- thd_proc_info(thd, "Waiting for slave mutex on exit");
+ thd_proc_info(thd, "Slave io thread waiting for slave mutex on exit");
mysql_mutex_lock(&mi->run_lock);
err_during_init:
@@ -3996,7 +4098,6 @@ pthread_handler_t handle_slave_sql(void *arg)
thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->rpl_filter = mi->rpl_filter;
- serial_rgi->thd= thd;
DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi);
@@ -4007,7 +4108,15 @@ pthread_handler_t handle_slave_sql(void *arg)
rli->events_till_abort = abort_slave_event_count;
#endif
- rli->sql_thd= thd;
+ /*
+ THD for the sql driver thd. In parallel replication this is the thread
+ that reads things from the relay log and calls rpl_parallel::do_event()
+ to execute queries.
+
+ In single thread replication this is the THD for the thread that is
+ executing SQL queries too.
+ */
+ serial_rgi->thd= rli->sql_driver_thd= thd;
/* Inform waiting threads that slave has started */
rli->slave_run_id++;
@@ -4032,8 +4141,6 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->deferred_events= new Deferred_log_events(rli);
}
- thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
- set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
/*
binlog_annotate_row_events must be TRUE only after an Annotate_rows event
has been recieved and only till the last corresponding rbr event has been
@@ -4110,7 +4217,6 @@ pthread_handler_t handle_slave_sql(void *arg)
#endif
}
#endif
- DBUG_ASSERT(rli->sql_thd == thd);
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
rli->group_master_log_name,
@@ -4193,10 +4299,9 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
/* Read queries from the IO/THREAD until this thread is killed */
- while (!sql_slave_killed(thd,rli))
+ while (!sql_slave_killed(serial_rgi))
{
thd_proc_info(thd, "Reading event from the relay log");
- DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
if (saved_skip && rli->slave_skip_counter == 0)
@@ -4217,7 +4322,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
{
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
- if (!sql_slave_killed(thd,rli))
+ if (!sql_slave_killed(serial_rgi))
{
/*
retrieve as much info as possible from the thd and, error
@@ -4349,7 +4454,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
thd->catalog= 0;
thd->reset_query();
thd->reset_db(NULL, 0);
- thd_proc_info(thd, "Waiting for slave mutex on exit");
+ thd_proc_info(thd, "Sql driver thread waiting for slave mutex on exit");
mysql_mutex_lock(&rli->run_lock);
err_during_init:
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
@@ -4367,17 +4472,14 @@ err_during_init:
rli->ignore_log_space_limit= 0; /* don't need any lock */
/* we die so won't remember charset - re-update them on next thread start */
rli->cached_charset_invalidate();
- rli->save_temporary_tables = thd->temporary_tables;
/*
TODO: see if we can do this conditionally in next_event() instead
to avoid unneeded position re-init
*/
thd->temporary_tables = 0; // remove tempation from destructor to close them
- DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
- rli->sql_thd= 0;
- set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
+ serial_rgi->thd= rli->sql_driver_thd= 0;
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
@@ -5474,7 +5576,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
"terminated.");
DBUG_RETURN(1);
}
- while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
+ while (!(slave_was_killed = io_slave_killed(mi)) &&
(reconnect ? mysql_reconnect(mysql) != 0 :
mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, client_flag) == 0))
@@ -5552,19 +5654,20 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
}
+#ifdef NOT_USED
MYSQL *rpl_connect_master(MYSQL *mysql)
{
- THD *thd= current_thd;
Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
bool allocated= false;
my_bool my_true= 1;
+ THD *thd;
if (!mi)
{
sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
return NULL;
}
-
+ thd= mi->io_thd;
if (!mysql)
{
if(!(mysql= mysql_init(NULL)))
@@ -5607,11 +5710,11 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
if (mi->user == NULL
|| mi->user[0] == 0
- || io_slave_killed(thd, mi)
+ || io_slave_killed( mi)
|| !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
- if (!io_slave_killed(thd, mi))
+ if (!io_slave_killed( mi))
sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
mysql_error(mysql), mysql_errno(mysql));
@@ -5621,6 +5724,7 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
}
return mysql;
}
+#endif
/*
Store the file and position where the execute-slave thread are in the
@@ -5727,16 +5831,17 @@ static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
error is reported through the sql_print_information() or
sql_print_error() functions.
*/
-static Log_event* next_event(Relay_log_info* rli)
+static Log_event* next_event(rpl_group_info *rgi)
{
Log_event* ev;
+ Relay_log_info *rli= rgi->rli;
IO_CACHE* cur_log = rli->cur_log;
mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
- THD* thd = rli->sql_thd;
+ THD *thd = rgi->thd;
DBUG_ENTER("next_event");
- DBUG_ASSERT(thd != 0);
+ DBUG_ASSERT(thd != 0 && thd == rli->sql_driver_thd);
#ifndef DBUG_OFF
if (abort_slave_event_count && !rli->events_till_abort--)
@@ -5752,7 +5857,7 @@ static Log_event* next_event(Relay_log_info* rli)
*/
mysql_mutex_assert_owner(&rli->data_lock);
- while (!sql_slave_killed(thd,rli))
+ while (!sql_slave_killed(rgi))
{
/*
We can have two kinds of log reading:
@@ -5821,7 +5926,6 @@ static Log_event* next_event(Relay_log_info* rli)
opt_slave_sql_verify_checksum)))
{
- DBUG_ASSERT(thd==rli->sql_thd);
/*
read it while we have a lock, to avoid a mutex lock in
inc_event_relay_log_pos()
@@ -5832,7 +5936,6 @@ static Log_event* next_event(Relay_log_info* rli)
mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev);
}
- DBUG_ASSERT(thd==rli->sql_thd);
if (opt_reckless_slave) // For mysql-test
cur_log->error = 0;
if (cur_log->error < 0)
@@ -5920,14 +6023,15 @@ static Log_event* next_event(Relay_log_info* rli)
and reads one more event and starts honoring log_space_limit again.
If the SQL thread needs more events to be able to rotate the log (it
- might need to finish the current group first), then it can ask for one
- more at a time. Thus we don't outgrow the relay log indefinitely,
+ might need to finish the current group first), then it can ask for
+ one more at a time. Thus we don't outgrow the relay log indefinitely,
but rather in a controlled manner, until the next rotate.
When the SQL thread starts it sets ignore_log_space_limit to false.
We should also reset ignore_log_space_limit to 0 when the user does
- RESET SLAVE, but in fact, no need as RESET SLAVE requires that the slave
- be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
+ RESET SLAVE, but in fact, no need as RESET SLAVE requires that the
+ slave be stopped, and the SQL thread sets ignore_log_space_limit
+ to 0 when
it stops.
*/
mysql_mutex_lock(&rli->log_space_lock);
@@ -5965,7 +6069,7 @@ static Log_event* next_event(Relay_log_info* rli)
mysql_mutex_unlock(&rli->log_space_lock);
mysql_cond_broadcast(&rli->log_space_cond);
// Note that wait_for_update_relay_log unlocks lock_log !
- rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
+ rli->relay_log.wait_for_update_relay_log(rli->sql_driver_thd);
// re-acquire data lock since we released it earlier
mysql_mutex_lock(&rli->data_lock);
rli->last_master_timestamp= save_timestamp;
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 109a4ef41e9..80c0b98fd73 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -57,6 +57,7 @@
#include "sql_table.h" // build_table_filename
#include "datadict.h" // dd_frm_is_view()
#include "sql_hset.h" // Hash_set
+#include "rpl_rli.h" // rpl_group_info
#ifdef __WIN__
#include <io.h>
#endif
@@ -1230,11 +1231,24 @@ bool close_cached_connection_tables(THD *thd, LEX_STRING *connection)
static void mark_temp_tables_as_free_for_reuse(THD *thd)
{
+ DBUG_ENTER("mark_temp_tables_as_free_for_reuse");
+
+ thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
}
+ thd->unlock_temporary_tables();
+ if (thd->rgi_slave)
+ {
+ /*
+ Temporary tables are shared with other by sql execution threads.
+ As a safety messure, clear the pointer to the common area.
+ */
+ thd->temporary_tables= 0;
+ }
+ DBUG_VOID_RETURN;
}
@@ -1248,6 +1262,7 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd)
void mark_tmp_table_for_reuse(TABLE *table)
{
+ DBUG_ENTER("mark_tmp_table_for_reuse");
DBUG_ASSERT(table->s->tmp_table);
table->query_id= 0;
@@ -1278,6 +1293,7 @@ void mark_tmp_table_for_reuse(TABLE *table)
LOCK TABLES is allowed (but ignored) for a temporary table.
*/
table->reginfo.lock_type= TL_WRITE;
+ DBUG_VOID_RETURN;
}
@@ -1628,6 +1644,10 @@ static inline uint tmpkeyval(THD *thd, TABLE *table)
/*
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
creates one DROP TEMPORARY TABLE binlog event for each pseudo-thread
+
+ Temporary tables created in a sql slave is closed by
+ Relay_log_info::close_temporary_tables()
+
*/
bool close_temporary_tables(THD *thd)
@@ -1642,6 +1662,7 @@ bool close_temporary_tables(THD *thd)
if (!thd->temporary_tables)
DBUG_RETURN(FALSE);
+ DBUG_ASSERT(!thd->rgi_slave);
if (!mysql_bin_log.is_open())
{
@@ -2096,16 +2117,42 @@ TABLE *find_temporary_table(THD *thd,
const char *table_key,
uint table_key_length)
{
+ TABLE *result= 0;
+ if (!thd->have_temporary_tables())
+ return NULL;
+
+ thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables; table; table= table->next)
{
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
- return table;
+ /*
+ We need to set the THD as it may be different in case of
+ parallel replication
+ */
+ if (table->in_use != thd)
+ {
+ table->in_use= thd;
+#ifdef REMOVE_AFTER_MERGE_WITH_10
+ if (thd->rgi_slave)
+ {
+ /*
+ We may be stealing an opened temporary tables from one slave
+ thread to another, we need to let the performance schema know that,
+ for aggregates per thread to work properly.
+ */
+ table->file->unbind_psi();
+ table->file->rebind_psi();
+ }
+#endif
+ }
+ result= table;
+ break;
}
}
-
- return NULL;
+ thd->unlock_temporary_tables();
+ return result;
}
@@ -2153,6 +2200,9 @@ int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
/* Table might be in use by some outer statement. */
if (table->query_id && table->query_id != thd->query_id)
{
+ DBUG_PRINT("info", ("table->query_id: %lu thd->query_id: %lu",
+ (ulong) table->query_id, (ulong) thd->query_id));
+
my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
DBUG_RETURN(-1);
}
@@ -2181,6 +2231,7 @@ void close_temporary_table(THD *thd, TABLE *table,
table->s->db.str, table->s->table_name.str,
(long) table, table->alias.c_ptr()));
+ thd->lock_temporary_tables();
if (table->prev)
{
table->prev->next= table->next;
@@ -2200,12 +2251,14 @@ void close_temporary_table(THD *thd, TABLE *table,
if (thd->temporary_tables)
table->next->prev= 0;
}
- if (thd->slave_thread)
+ if (thd->rgi_slave)
{
/* natural invariant of temporary_tables */
DBUG_ASSERT(slave_open_temp_tables || !thd->temporary_tables);
- slave_open_temp_tables--;
+ thread_safe_decrement32(&slave_open_temp_tables, &thread_running_lock);
+ table->in_use= 0; // No statistics
}
+ thd->unlock_temporary_tables();
close_temporary(table, free_share, delete_table);
DBUG_VOID_RETURN;
}
@@ -2651,35 +2704,30 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
TODO: move this block into a separate function.
*/
if (table_list->open_type != OT_BASE_ONLY &&
- ! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
+ ! (flags & MYSQL_OPEN_SKIP_TEMPORARY) && thd->have_temporary_tables())
{
- for (table= thd->temporary_tables; table ; table=table->next)
- {
- if (table->s->table_cache_key.length == key_length +
- TMP_TABLE_KEY_EXTRA &&
- !memcmp(table->s->table_cache_key.str, key,
- key_length + TMP_TABLE_KEY_EXTRA))
+ if ((table= find_temporary_table(thd, key,
+ key_length + TMP_TABLE_KEY_EXTRA)))
+ {
+ /*
+ Check if we're trying to use the same temporary table twice in a query.
+ Right now we don't support this because a temporary table
+ is always represented by only one TABLE object in THD, and
+ it can not be cloned. Emit an error for an unsupported behaviour.
+ */
+ if (table->query_id)
{
- /*
- We're trying to use the same temporary table twice in a query.
- Right now we don't support this because a temporary table
- is always represented by only one TABLE object in THD, and
- it can not be cloned. Emit an error for an unsupported behaviour.
- */
- if (table->query_id)
- {
- DBUG_PRINT("error",
- ("query_id: %lu server_id: %u pseudo_thread_id: %lu",
- (ulong) table->query_id, (uint) thd->variables.server_id,
- (ulong) thd->variables.pseudo_thread_id));
- my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
- DBUG_RETURN(TRUE);
- }
- table->query_id= thd->query_id;
- thd->thread_specific_used= TRUE;
- DBUG_PRINT("info",("Using temporary table"));
- goto reset;
+ DBUG_PRINT("error",
+ ("query_id: %lu server_id: %u pseudo_thread_id: %lu",
+ (ulong) table->query_id, (uint) thd->variables.server_id,
+ (ulong) thd->variables.pseudo_thread_id));
+ my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
+ DBUG_RETURN(TRUE);
}
+ table->query_id= thd->query_id;
+ thd->thread_specific_used= TRUE;
+ DBUG_PRINT("info",("Using temporary table"));
+ goto reset;
}
}
@@ -5987,14 +6035,18 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton,
if (add_to_temporary_tables_list)
{
+ thd->lock_temporary_tables();
/* growing temp list at the head */
tmp_table->next= thd->temporary_tables;
if (tmp_table->next)
tmp_table->next->prev= tmp_table;
thd->temporary_tables= tmp_table;
thd->temporary_tables->prev= 0;
- if (thd->slave_thread)
- slave_open_temp_tables++;
+ if (thd->rgi_slave)
+ {
+ thread_safe_increment32(&slave_open_temp_tables, &thread_running_lock);
+ }
+ thd->unlock_temporary_tables();
}
tmp_table->pos_in_table_list= 0;
DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index 4f41b942345..9bcfe64cf2d 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -99,6 +99,7 @@ void mysql_client_binlog_statement(THD* thd)
}
if (!(rgi= thd->rgi_fake))
rgi= thd->rgi_fake= new rpl_group_info(rli);
+ rgi->thd= thd;
const char *error= 0;
char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME));
@@ -115,7 +116,7 @@ void mysql_client_binlog_statement(THD* thd)
goto end;
}
- rli->sql_thd= thd;
+ rli->sql_driver_thd= thd;
rli->no_storage= TRUE;
for (char const *strptr= thd->lex->comment.str ;
@@ -200,8 +201,6 @@ void mysql_client_binlog_statement(THD* thd)
}
}
- rgi->rli= rli;
- rgi->thd= thd;
ev= Log_event::read_log_event(bufptr, event_len, &error,
rli->relay_log.description_event_for_exec,
0);
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 714adfba8f7..f424e34969d 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5597,6 +5597,24 @@ THD::signal_wakeup_ready()
}
+void THD::rgi_lock_temporary_tables()
+{
+ mysql_mutex_lock(&rgi_slave->rli->data_lock);
+ temporary_tables= rgi_slave->rli->save_temporary_tables;
+}
+
+void THD::rgi_unlock_temporary_tables()
+{
+ rgi_slave->rli->save_temporary_tables= temporary_tables;
+ mysql_mutex_unlock(&rgi_slave->rli->data_lock);
+}
+
+bool THD::rgi_have_temporary_tables()
+{
+ return rgi_slave->rli->save_temporary_tables != 0;
+}
+
+
wait_for_commit::wait_for_commit()
: subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
opaque_pointer(0),
diff --git a/sql/sql_class.h b/sql/sql_class.h
index c34c100171d..01121fd5b35 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -3371,6 +3371,27 @@ private:
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
mysql_cond_t COND_wakeup_ready;
+
+ /* Protect against add/delete of temporary tables in parallel replication */
+ void rgi_lock_temporary_tables();
+ void rgi_unlock_temporary_tables();
+ bool rgi_have_temporary_tables();
+public:
+ inline void lock_temporary_tables()
+ {
+ if (rgi_slave)
+ rgi_lock_temporary_tables();
+ }
+ inline void unlock_temporary_tables()
+ {
+ if (rgi_slave)
+ rgi_unlock_temporary_tables();
+ }
+ inline bool have_temporary_tables()
+ {
+ return (temporary_tables ||
+ (rgi_slave && rgi_have_temporary_tables()));
+ }
};
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 3a499145a63..495f0d591dd 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -3955,6 +3955,7 @@ end_with_restore_list:
break;
case SQLCOM_BEGIN:
+ DBUG_PRINT("info", ("Executing SQLCOM_BEGIN thd: %p", thd));
if (trans_begin(thd, lex->start_transaction_opt))
goto error;
my_ok(thd);
diff --git a/sql/transaction.cc b/sql/transaction.cc
index 1623cd57d77..3117cd7d166 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -139,6 +139,11 @@ bool trans_begin(THD *thd, uint flags)
}
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
+
+ /*
+ The following set should not be needed as the flag should always be 0
+ when we come here. We should at some point change this to an assert.
+ */
thd->transaction.all.modified_non_trans_table= FALSE;
if (res)