summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-07-12 14:36:20 +0200
committerunknown <knielsen@knielsen-hq.org>2013-07-12 14:36:20 +0200
commitba4b937af2e3c9118071b1279bc39b6febca73a9 (patch)
treead2cf72470e8a031ae5ce0b3f7568cca0faf5340 /sql
parent6d5f237e091ca7aa4fdd52c186af11fffc80b1c2 (diff)
downloadmariadb-git-ba4b937af2e3c9118071b1279bc39b6febca73a9.tar.gz
MDEV-4506: Parallel replication: Intermediate commit
Move the deferred event stuff from Relay_log_info to rpl_group_info to make it thread safe for parallel replication.
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.cc20
-rw-r--r--sql/log_event.h10
-rw-r--r--sql/rpl_parallel.cc4
-rw-r--r--sql/rpl_rli.cc7
-rw-r--r--sql/rpl_rli.h71
-rw-r--r--sql/rpl_utility.cc7
-rw-r--r--sql/slave.cc10
-rw-r--r--sql/sql_binlog.cc11
-rw-r--r--sql/sql_class.cc17
-rw-r--r--sql/sql_class.h4
-rw-r--r--sql/sql_insert.cc8
-rw-r--r--sql/sql_load.cc6
12 files changed, 89 insertions, 86 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index cb7bc3924f5..f07c58f4d6b 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6716,8 +6716,8 @@ int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi)
*/
rli->set_flag(Relay_log_info::IN_STMT);
- if (rli->deferred_events_collecting)
- return rli->deferred_events->add(this);
+ if (rgi->deferred_events_collecting)
+ return rgi->deferred_events->add(this);
switch (type) {
case LAST_INSERT_ID_EVENT:
@@ -6827,8 +6827,8 @@ int Rand_log_event::do_apply_event(struct rpl_group_info *rgi)
*/
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
- if (rli->deferred_events_collecting)
- return rli->deferred_events->add(this);
+ if (rgi->deferred_events_collecting)
+ return rgi->deferred_events->add(this);
thd->rand.seed1= (ulong) seed1;
thd->rand.seed2= (ulong) seed2;
@@ -6868,14 +6868,14 @@ Rand_log_event::do_shall_skip(Relay_log_info *rli)
bool slave_execute_deferred_events(THD *thd)
{
bool res= false;
- Relay_log_info *rli= thd->rli_slave;
+ rpl_group_info *rgi= thd->rgi_slave;
- DBUG_ASSERT(rli && (!rli->deferred_events_collecting || rli->deferred_events));
+ DBUG_ASSERT(rgi && (!rgi->deferred_events_collecting || rgi->deferred_events));
- if (!rli->deferred_events_collecting || rli->deferred_events->is_empty())
+ if (!rgi->deferred_events_collecting || rgi->deferred_events->is_empty())
return res;
- res= rli->deferred_events->execute(rli->group_info);
+ res= rgi->deferred_events->execute(rgi);
return res;
}
@@ -7423,10 +7423,10 @@ int User_var_log_event::do_apply_event(struct rpl_group_info *rgi)
Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("User_var_log_event::do_apply_event");
- if (rli->deferred_events_collecting)
+ if (rgi->deferred_events_collecting)
{
set_deferred();
- DBUG_RETURN(rli->deferred_events->add(this));
+ DBUG_RETURN(rgi->deferred_events->add(this));
}
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
diff --git a/sql/log_event.h b/sql/log_event.h
index 8bda493a7ec..6d6a330fc48 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -4698,16 +4698,6 @@ bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg);
uint8 get_checksum_alg(const char* buf, ulong len);
extern TYPELIB binlog_checksum_typelib;
-#ifndef MYSQL_CLIENT
-/**
- The function is called by slave applier in case there are
- active table filtering rules to force gathering events associated
- with Query-log-event into an array to execute
- them once the fate of the Query is determined for execution.
-*/
-bool slave_execute_deferred_events(THD *thd);
-#endif
-
/**
@} (end of group Replication)
*/
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 2bb5083a4f3..c3c557436cf 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -66,7 +66,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
- thd->rli_slave= rli;
+ thd->rgi_slave= rgi;
thd->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Get rid of rli->group_info, it is not thread safe. */
rli->group_info= rgi;
@@ -574,6 +574,8 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
return true;
}
+ if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
+ rgi->deferred_events= new Deferred_log_events(rli);
if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
e->last_commit_id == gtid_ev->commit_id)
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 8fb22266d5e..b96125d41cb 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -60,7 +60,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
group_info(0), tables_to_lock(0), tables_to_lock_count(0),
- last_event_start_time(0), deferred_events(NULL),m_flags(0),
+ last_event_start_time(0), m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false),
m_annotate_event(0)
{
@@ -1535,7 +1535,8 @@ end:
rpl_group_info::rpl_group_info(Relay_log_info *rli_)
: rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
- wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0)
+ wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
+ deferred_events(NULL)
{
bzero(&current_gtid, sizeof(current_gtid));
}
@@ -1596,7 +1597,7 @@ delete_or_keep_event_post_apply(Relay_log_info *rli,
/* fall through */
default:
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
- if (!rli->is_deferred_event(ev))
+ if (!rli->group_info->is_deferred_event(ev))
delete ev;
break;
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 294f2ba885a..07ce0600d94 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -403,41 +403,6 @@ public:
The timestamp is set and reset in @c sql_slave_killed().
*/
time_t last_event_start_time;
-
- /*
- A container to hold on Intvar-, Rand-, Uservar- log-events in case
- the slave is configured with table filtering rules.
- The withhold events are executed when their parent Query destiny is
- determined for execution as well.
- */
- Deferred_log_events *deferred_events;
-
- /*
- State of the container: true stands for IRU events gathering,
- false does for execution, either deferred or direct.
- */
- bool deferred_events_collecting;
-
- /*
- Returns true if the argument event resides in the containter;
- more specifically, the checking is done against the last added event.
- */
- bool is_deferred_event(Log_event * ev)
- {
- return deferred_events_collecting ? deferred_events->is_last(ev) : false;
- };
- /* The general cleanup that slave applier may need at the end of query. */
- inline void cleanup_after_query()
- {
- if (deferred_events)
- deferred_events->rewind();
- };
- /* The general cleanup that slave applier may need at the end of session. */
- void cleanup_after_session()
- {
- if (deferred_events)
- delete deferred_events;
- };
/**
Helper function to do after statement completion.
@@ -581,6 +546,7 @@ public:
private:
+ /* ToDo: This must be moved to rpl_group_info. */
uint32 m_flags;
/*
@@ -645,6 +611,41 @@ struct rpl_group_info
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info() { };
+
+ /*
+ A container to hold on Intvar-, Rand-, Uservar- log-events in case
+ the slave is configured with table filtering rules.
+ The withhold events are executed when their parent Query destiny is
+ determined for execution as well.
+ */
+ Deferred_log_events *deferred_events;
+
+ /*
+ State of the container: true stands for IRU events gathering,
+ false does for execution, either deferred or direct.
+ */
+ bool deferred_events_collecting;
+
+ /*
+ Returns true if the argument event resides in the containter;
+ more specifically, the checking is done against the last added event.
+ */
+ bool is_deferred_event(Log_event * ev)
+ {
+ return deferred_events_collecting ? deferred_events->is_last(ev) : false;
+ };
+ /* The general cleanup that slave applier may need at the end of query. */
+ inline void cleanup_after_query()
+ {
+ if (deferred_events)
+ deferred_events->rewind();
+ };
+ /* The general cleanup that slave applier may need at the end of session. */
+ void cleanup_after_session()
+ {
+ if (deferred_events)
+ delete deferred_events;
+ };
};
diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc
index cce8ef99fef..f734b95edc1 100644
--- a/sql/rpl_utility.cc
+++ b/sql/rpl_utility.cc
@@ -1146,18 +1146,17 @@ bool Deferred_log_events::is_empty()
bool Deferred_log_events::execute(struct rpl_group_info *rgi)
{
bool res= false;
- Relay_log_info *rli= rgi->rli;
- DBUG_ASSERT(rli->deferred_events_collecting);
+ DBUG_ASSERT(rgi->deferred_events_collecting);
- rli->deferred_events_collecting= false;
+ rgi->deferred_events_collecting= false;
for (uint i= 0; !res && i < array.elements; i++)
{
Log_event *ev= (* (Log_event **)
dynamic_array_ptr(&array, i));
res= ev->apply_event(rgi);
}
- rli->deferred_events_collecting= true;
+ rgi->deferred_events_collecting= true;
return res;
}
diff --git a/sql/slave.cc b/sql/slave.cc
index b9ef3172364..a26010d75cc 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -4025,10 +4025,10 @@ pthread_handler_t handle_slave_sql(void *arg)
goto err_during_init;
}
thd->init_for_queries();
- thd->rli_slave= rli;
- if ((rli->deferred_events_collecting= mi->rpl_filter->is_on()))
+ thd->rgi_slave= &serial_rgi;
+ if ((serial_rgi.deferred_events_collecting= mi->rpl_filter->is_on()))
{
- rli->deferred_events= new Deferred_log_events(rli);
+ serial_rgi.deferred_events= new Deferred_log_events(rli);
}
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
@@ -6302,10 +6302,10 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report,
*/
bool rpl_master_erroneous_autoinc(THD *thd)
{
- if (thd->rli_slave)
+ if (thd->rgi_slave)
{
DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;);
- return rpl_master_has_bug(thd->rli_slave, 33029, FALSE, NULL, NULL);
+ return rpl_master_has_bug(thd->rgi_slave->rli, 33029, FALSE, NULL, NULL);
}
return FALSE;
}
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index df6aab88200..1b6713f1bc3 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -80,6 +80,8 @@ void mysql_client_binlog_statement(THD* thd)
my_bool have_fd_event= TRUE;
int err;
Relay_log_info *rli;
+ struct rpl_group_info *rgi;
+
rli= thd->rli_fake;
if (!rli)
{
@@ -95,11 +97,12 @@ void mysql_client_binlog_statement(THD* thd)
new Format_description_log_event(4);
have_fd_event= FALSE;
}
+ if (!(rgi= thd->rgi_fake))
+ rgi= thd->rgi_fake= new rpl_group_info(rli);
const char *error= 0;
char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME));
Log_event *ev = 0;
- struct rpl_group_info rgi(rli);
/*
Out of memory check
@@ -197,8 +200,8 @@ void mysql_client_binlog_statement(THD* thd)
}
}
- rgi.rli= rli;
- rgi.thd= thd;
+ rgi->rli= rli;
+ rgi->thd= thd;
ev= Log_event::read_log_event(bufptr, event_len, &error,
rli->relay_log.description_event_for_exec,
0);
@@ -235,7 +238,7 @@ void mysql_client_binlog_statement(THD* thd)
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
OPTION_SKIP_REPLICATION : 0);
- err= ev->apply_event(&rgi);
+ err= ev->apply_event(rgi);
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index aec65dc385c..43d810d27d4 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -769,7 +769,7 @@ bool Drop_table_error_handler::handle_condition(THD *thd,
THD::THD()
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
- rli_fake(0), rli_slave(NULL),
+ rli_fake(0), rgi_fake(0), rgi_slave(NULL),
in_sub_stmt(0), log_all_errors(0),
binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
@@ -1490,6 +1490,11 @@ THD::~THD()
dbug_sentry= THD_SENTRY_GONE;
#endif
#ifndef EMBEDDED_LIBRARY
+ if (rgi_fake)
+ {
+ delete rgi_fake;
+ rgi_fake= NULL;
+ }
if (rli_fake)
{
delete rli_fake;
@@ -1497,8 +1502,8 @@ THD::~THD()
}
mysql_audit_free_thd(this);
- if (rli_slave)
- rli_slave->cleanup_after_session();
+ if (rgi_slave)
+ rgi_slave->cleanup_after_session();
#endif
free_root(&main_mem_root, MYF(0));
@@ -1883,7 +1888,7 @@ void THD::cleanup_after_query()
which is intended to consume its event (there can be other
SET statements between them).
*/
- if ((rli_slave || rli_fake) && is_update_query(lex->sql_command))
+ if ((rgi_slave || rli_fake) && is_update_query(lex->sql_command))
auto_inc_intervals_forced.empty();
#endif
}
@@ -1905,8 +1910,8 @@ void THD::cleanup_after_query()
m_binlog_invoker= FALSE;
#ifndef EMBEDDED_LIBRARY
- if (rli_slave)
- rli_slave->cleanup_after_query();
+ if (rgi_slave)
+ rgi_slave->cleanup_after_query();
#endif
DBUG_VOID_RETURN;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 3b7cfb42ec7..e7f593db62b 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -47,6 +47,7 @@
class Reprepare_observer;
class Relay_log_info;
+struct rpl_group_info;
class Rpl_filter;
class Query_log_event;
@@ -1697,8 +1698,9 @@ public:
/* Used to execute base64 coded binlog events in MySQL server */
Relay_log_info* rli_fake;
+ rpl_group_info* rgi_fake;
/* Slave applier execution context */
- Relay_log_info* rli_slave;
+ rpl_group_info* rgi_slave;
/* Used to SLAVE SQL thread */
Rpl_filter* rpl_filter;
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 511296f3e4b..ac1837a778d 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -810,10 +810,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
table->next_number_field=table->found_next_number_field;
#ifdef HAVE_REPLICATION
- if (thd->rli_slave &&
+ if (thd->rgi_slave &&
(info.handle_duplicates == DUP_UPDATE) &&
(table->next_number_field != NULL) &&
- rpl_master_has_bug(thd->rli_slave, 24432, TRUE, NULL, NULL))
+ rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
goto abort;
#endif
@@ -3464,10 +3464,10 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
table->next_number_field=table->found_next_number_field;
#ifdef HAVE_REPLICATION
- if (thd->rli_slave &&
+ if (thd->rgi_slave &&
(info.handle_duplicates == DUP_UPDATE) &&
(table->next_number_field != NULL) &&
- rpl_master_has_bug(thd->rli_slave, 24432, TRUE, NULL, NULL))
+ rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
DBUG_RETURN(1);
#endif
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index 6a4712ca5b5..339820574c2 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -362,11 +362,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
MY_RETURN_REAL_PATH);
}
- if (thd->rli_slave)
+ if (thd->rgi_slave)
{
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
- if (strncmp(thd->rli_slave->slave_patternload_file, name,
- thd->rli_slave->slave_patternload_file_size))
+ if (strncmp(thd->rgi_slave->rli->slave_patternload_file, name,
+ thd->rgi_slave->rli->slave_patternload_file_size))
{
/*
LOAD DATA INFILE in the slave SQL Thread can only read from