diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-07-12 14:36:20 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-07-12 14:36:20 +0200 |
commit | ba4b937af2e3c9118071b1279bc39b6febca73a9 (patch) | |
tree | ad2cf72470e8a031ae5ce0b3f7568cca0faf5340 /sql | |
parent | 6d5f237e091ca7aa4fdd52c186af11fffc80b1c2 (diff) | |
download | mariadb-git-ba4b937af2e3c9118071b1279bc39b6febca73a9.tar.gz |
MDEV-4506: Parallel replication: Intermediate commit
Move the deferred event stuff from Relay_log_info to rpl_group_info
to make it thread safe for parallel replication.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.cc | 20 | ||||
-rw-r--r-- | sql/log_event.h | 10 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 4 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 7 | ||||
-rw-r--r-- | sql/rpl_rli.h | 71 | ||||
-rw-r--r-- | sql/rpl_utility.cc | 7 | ||||
-rw-r--r-- | sql/slave.cc | 10 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 11 | ||||
-rw-r--r-- | sql/sql_class.cc | 17 | ||||
-rw-r--r-- | sql/sql_class.h | 4 | ||||
-rw-r--r-- | sql/sql_insert.cc | 8 | ||||
-rw-r--r-- | sql/sql_load.cc | 6 |
12 files changed, 89 insertions, 86 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index cb7bc3924f5..f07c58f4d6b 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6716,8 +6716,8 @@ int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi) */ rli->set_flag(Relay_log_info::IN_STMT); - if (rli->deferred_events_collecting) - return rli->deferred_events->add(this); + if (rgi->deferred_events_collecting) + return rgi->deferred_events->add(this); switch (type) { case LAST_INSERT_ID_EVENT: @@ -6827,8 +6827,8 @@ int Rand_log_event::do_apply_event(struct rpl_group_info *rgi) */ const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT); - if (rli->deferred_events_collecting) - return rli->deferred_events->add(this); + if (rgi->deferred_events_collecting) + return rgi->deferred_events->add(this); thd->rand.seed1= (ulong) seed1; thd->rand.seed2= (ulong) seed2; @@ -6868,14 +6868,14 @@ Rand_log_event::do_shall_skip(Relay_log_info *rli) bool slave_execute_deferred_events(THD *thd) { bool res= false; - Relay_log_info *rli= thd->rli_slave; + rpl_group_info *rgi= thd->rgi_slave; - DBUG_ASSERT(rli && (!rli->deferred_events_collecting || rli->deferred_events)); + DBUG_ASSERT(rgi && (!rgi->deferred_events_collecting || rgi->deferred_events)); - if (!rli->deferred_events_collecting || rli->deferred_events->is_empty()) + if (!rgi->deferred_events_collecting || rgi->deferred_events->is_empty()) return res; - res= rli->deferred_events->execute(rli->group_info); + res= rgi->deferred_events->execute(rgi); return res; } @@ -7423,10 +7423,10 @@ int User_var_log_event::do_apply_event(struct rpl_group_info *rgi) Relay_log_info const *rli= rgi->rli; DBUG_ENTER("User_var_log_event::do_apply_event"); - if (rli->deferred_events_collecting) + if (rgi->deferred_events_collecting) { set_deferred(); - DBUG_RETURN(rli->deferred_events->add(this)); + DBUG_RETURN(rgi->deferred_events->add(this)); } if (!(charset= get_charset(charset_number, MYF(MY_WME)))) diff --git a/sql/log_event.h b/sql/log_event.h index 8bda493a7ec..6d6a330fc48 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -4698,16 +4698,6 @@ bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg); uint8 get_checksum_alg(const char* buf, ulong len); extern TYPELIB binlog_checksum_typelib; -#ifndef MYSQL_CLIENT -/** - The function is called by slave applier in case there are - active table filtering rules to force gathering events associated - with Query-log-event into an array to execute - them once the fate of the Query is determined for execution. -*/ -bool slave_execute_deferred_events(THD *thd); -#endif - /** @} (end of group Replication) */ diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 2bb5083a4f3..c3c557436cf 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -66,7 +66,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; - thd->rli_slave= rli; + thd->rgi_slave= rgi; thd->rpl_filter = rli->mi->rpl_filter; /* ToDo: Get rid of rli->group_info, it is not thread safe. */ rli->group_info= rgi; @@ -574,6 +574,8 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev) my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); return true; } + if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) + rgi->deferred_events= new Deferred_log_events(rli); if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && e->last_commit_id == gtid_ev->commit_id) diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 8fb22266d5e..b96125d41cb 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -60,7 +60,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), group_info(0), tables_to_lock(0), tables_to_lock_count(0), - last_event_start_time(0), deferred_events(NULL),m_flags(0), + last_event_start_time(0), m_flags(0), row_stmt_start_timestamp(0), long_find_row_note_printed(false), m_annotate_event(0) { @@ -1535,7 +1535,8 @@ end: rpl_group_info::rpl_group_info(Relay_log_info *rli_) : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), - wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0) + wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), + deferred_events(NULL) { bzero(¤t_gtid, sizeof(current_gtid)); } @@ -1596,7 +1597,7 @@ delete_or_keep_event_post_apply(Relay_log_info *rli, /* fall through */ default: DBUG_PRINT("info", ("Deleting the event after it has been executed")); - if (!rli->is_deferred_event(ev)) + if (!rli->group_info->is_deferred_event(ev)) delete ev; break; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 294f2ba885a..07ce0600d94 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -403,41 +403,6 @@ public: The timestamp is set and reset in @c sql_slave_killed(). */ time_t last_event_start_time; - - /* - A container to hold on Intvar-, Rand-, Uservar- log-events in case - the slave is configured with table filtering rules. - The withhold events are executed when their parent Query destiny is - determined for execution as well. - */ - Deferred_log_events *deferred_events; - - /* - State of the container: true stands for IRU events gathering, - false does for execution, either deferred or direct. - */ - bool deferred_events_collecting; - - /* - Returns true if the argument event resides in the containter; - more specifically, the checking is done against the last added event. - */ - bool is_deferred_event(Log_event * ev) - { - return deferred_events_collecting ? deferred_events->is_last(ev) : false; - }; - /* The general cleanup that slave applier may need at the end of query. */ - inline void cleanup_after_query() - { - if (deferred_events) - deferred_events->rewind(); - }; - /* The general cleanup that slave applier may need at the end of session. */ - void cleanup_after_session() - { - if (deferred_events) - delete deferred_events; - }; /** Helper function to do after statement completion. @@ -581,6 +546,7 @@ public: private: + /* ToDo: This must be moved to rpl_group_info. */ uint32 m_flags; /* @@ -645,6 +611,41 @@ struct rpl_group_info rpl_group_info(Relay_log_info *rli_); ~rpl_group_info() { }; + + /* + A container to hold on Intvar-, Rand-, Uservar- log-events in case + the slave is configured with table filtering rules. + The withhold events are executed when their parent Query destiny is + determined for execution as well. + */ + Deferred_log_events *deferred_events; + + /* + State of the container: true stands for IRU events gathering, + false does for execution, either deferred or direct. + */ + bool deferred_events_collecting; + + /* + Returns true if the argument event resides in the containter; + more specifically, the checking is done against the last added event. + */ + bool is_deferred_event(Log_event * ev) + { + return deferred_events_collecting ? deferred_events->is_last(ev) : false; + }; + /* The general cleanup that slave applier may need at the end of query. */ + inline void cleanup_after_query() + { + if (deferred_events) + deferred_events->rewind(); + }; + /* The general cleanup that slave applier may need at the end of session. */ + void cleanup_after_session() + { + if (deferred_events) + delete deferred_events; + }; }; diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc index cce8ef99fef..f734b95edc1 100644 --- a/sql/rpl_utility.cc +++ b/sql/rpl_utility.cc @@ -1146,18 +1146,17 @@ bool Deferred_log_events::is_empty() bool Deferred_log_events::execute(struct rpl_group_info *rgi) { bool res= false; - Relay_log_info *rli= rgi->rli; - DBUG_ASSERT(rli->deferred_events_collecting); + DBUG_ASSERT(rgi->deferred_events_collecting); - rli->deferred_events_collecting= false; + rgi->deferred_events_collecting= false; for (uint i= 0; !res && i < array.elements; i++) { Log_event *ev= (* (Log_event **) dynamic_array_ptr(&array, i)); res= ev->apply_event(rgi); } - rli->deferred_events_collecting= true; + rgi->deferred_events_collecting= true; return res; } diff --git a/sql/slave.cc b/sql/slave.cc index b9ef3172364..a26010d75cc 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -4025,10 +4025,10 @@ pthread_handler_t handle_slave_sql(void *arg) goto err_during_init; } thd->init_for_queries(); - thd->rli_slave= rli; - if ((rli->deferred_events_collecting= mi->rpl_filter->is_on())) + thd->rgi_slave= &serial_rgi; + if ((serial_rgi.deferred_events_collecting= mi->rpl_filter->is_on())) { - rli->deferred_events= new Deferred_log_events(rli); + serial_rgi.deferred_events= new Deferred_log_events(rli); } thd->temporary_tables = rli->save_temporary_tables; // restore temp tables @@ -6302,10 +6302,10 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report, */ bool rpl_master_erroneous_autoinc(THD *thd) { - if (thd->rli_slave) + if (thd->rgi_slave) { DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;); - return rpl_master_has_bug(thd->rli_slave, 33029, FALSE, NULL, NULL); + return rpl_master_has_bug(thd->rgi_slave->rli, 33029, FALSE, NULL, NULL); } return FALSE; } diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index df6aab88200..1b6713f1bc3 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -80,6 +80,8 @@ void mysql_client_binlog_statement(THD* thd) my_bool have_fd_event= TRUE; int err; Relay_log_info *rli; + struct rpl_group_info *rgi; + rli= thd->rli_fake; if (!rli) { @@ -95,11 +97,12 @@ void mysql_client_binlog_statement(THD* thd) new Format_description_log_event(4); have_fd_event= FALSE; } + if (!(rgi= thd->rgi_fake)) + rgi= thd->rgi_fake= new rpl_group_info(rli); const char *error= 0; char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; - struct rpl_group_info rgi(rli); /* Out of memory check @@ -197,8 +200,8 @@ void mysql_client_binlog_statement(THD* thd) } } - rgi.rli= rli; - rgi.thd= thd; + rgi->rli= rli; + rgi->thd= thd; ev= Log_event::read_log_event(bufptr, event_len, &error, rli->relay_log.description_event_for_exec, 0); @@ -235,7 +238,7 @@ void mysql_client_binlog_statement(THD* thd) (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); - err= ev->apply_event(&rgi); + err= ev->apply_event(rgi); thd->variables.option_bits= (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | diff --git a/sql/sql_class.cc b/sql/sql_class.cc index aec65dc385c..43d810d27d4 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -769,7 +769,7 @@ bool Drop_table_error_handler::handle_condition(THD *thd, THD::THD() :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), - rli_fake(0), rli_slave(NULL), + rli_fake(0), rgi_fake(0), rgi_slave(NULL), in_sub_stmt(0), log_all_errors(0), binlog_unsafe_warning_flags(0), binlog_table_maps(0), @@ -1490,6 +1490,11 @@ THD::~THD() dbug_sentry= THD_SENTRY_GONE; #endif #ifndef EMBEDDED_LIBRARY + if (rgi_fake) + { + delete rgi_fake; + rgi_fake= NULL; + } if (rli_fake) { delete rli_fake; @@ -1497,8 +1502,8 @@ THD::~THD() } mysql_audit_free_thd(this); - if (rli_slave) - rli_slave->cleanup_after_session(); + if (rgi_slave) + rgi_slave->cleanup_after_session(); #endif free_root(&main_mem_root, MYF(0)); @@ -1883,7 +1888,7 @@ void THD::cleanup_after_query() which is intended to consume its event (there can be other SET statements between them). */ - if ((rli_slave || rli_fake) && is_update_query(lex->sql_command)) + if ((rgi_slave || rli_fake) && is_update_query(lex->sql_command)) auto_inc_intervals_forced.empty(); #endif } @@ -1905,8 +1910,8 @@ void THD::cleanup_after_query() m_binlog_invoker= FALSE; #ifndef EMBEDDED_LIBRARY - if (rli_slave) - rli_slave->cleanup_after_query(); + if (rgi_slave) + rgi_slave->cleanup_after_query(); #endif DBUG_VOID_RETURN; diff --git a/sql/sql_class.h b/sql/sql_class.h index 3b7cfb42ec7..e7f593db62b 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -47,6 +47,7 @@ class Reprepare_observer; class Relay_log_info; +struct rpl_group_info; class Rpl_filter; class Query_log_event; @@ -1697,8 +1698,9 @@ public: /* Used to execute base64 coded binlog events in MySQL server */ Relay_log_info* rli_fake; + rpl_group_info* rgi_fake; /* Slave applier execution context */ - Relay_log_info* rli_slave; + rpl_group_info* rgi_slave; /* Used to SLAVE SQL thread */ Rpl_filter* rpl_filter; diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 511296f3e4b..ac1837a778d 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -810,10 +810,10 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, table->next_number_field=table->found_next_number_field; #ifdef HAVE_REPLICATION - if (thd->rli_slave && + if (thd->rgi_slave && (info.handle_duplicates == DUP_UPDATE) && (table->next_number_field != NULL) && - rpl_master_has_bug(thd->rli_slave, 24432, TRUE, NULL, NULL)) + rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL)) goto abort; #endif @@ -3464,10 +3464,10 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) table->next_number_field=table->found_next_number_field; #ifdef HAVE_REPLICATION - if (thd->rli_slave && + if (thd->rgi_slave && (info.handle_duplicates == DUP_UPDATE) && (table->next_number_field != NULL) && - rpl_master_has_bug(thd->rli_slave, 24432, TRUE, NULL, NULL)) + rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL)) DBUG_RETURN(1); #endif diff --git a/sql/sql_load.cc b/sql/sql_load.cc index 6a4712ca5b5..339820574c2 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -362,11 +362,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, MY_RETURN_REAL_PATH); } - if (thd->rli_slave) + if (thd->rgi_slave) { #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) - if (strncmp(thd->rli_slave->slave_patternload_file, name, - thd->rli_slave->slave_patternload_file_size)) + if (strncmp(thd->rgi_slave->rli->slave_patternload_file, name, + thd->rgi_slave->rli->slave_patternload_file_size)) { /* LOAD DATA INFILE in the slave SQL Thread can only read from |