summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-11-01 12:00:11 +0100
committerunknown <knielsen@knielsen-hq.org>2013-11-01 12:00:11 +0100
commit57a267a8c00471bbe13724e7d9ba89d23acef3c2 (patch)
tree808a9f9b165bfe034304e4974dd618eaafcdedb2 /sql/rpl_rli.cc
parentbd3dc54261f10f387a03ad99ce74c3824c42e462 (diff)
parentcb86ce60b9bade5ae7712d8f3f74668208ee3fd2 (diff)
downloadmariadb-git-57a267a8c00471bbe13724e7d9ba89d23acef3c2.tar.gz
Merge from 10.0-base to 10.0 the feature MDEV-4506: Parallel replication.
The merge is still missing a few hunks related to temporary tables and InnoDB log file size. The associated code did not seem to exist in 10.0, so the merge of that needs more work. Until this is fixed, there are a number of test failures as a result.
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc438
1 files changed, 287 insertions, 151 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 70d6033cebc..2fad1177266 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -57,13 +57,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
#endif
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), slave_skip_counter(0),
- abort_pos_wait(0), slave_run_id(0), sql_thd(0),
+ abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
- gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
- last_event_start_time(0), deferred_events(NULL),m_flags(0),
- row_stmt_start_timestamp(0), long_find_row_note_printed(false),
- m_annotate_event(0)
+ m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -88,12 +85,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
&data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_log_space_lock,
&log_space_lock, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
- mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@@ -106,14 +101,11 @@ Relay_log_info::~Relay_log_info()
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
- mysql_mutex_destroy(&sleep_lock);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
- mysql_cond_destroy(&sleep_cond);
relay_log.cleanup();
- free_annotate_event();
DBUG_VOID_RETURN;
}
@@ -138,8 +130,6 @@ int init_relay_log_info(Relay_log_info* rli,
rli->abort_pos_wait=0;
rli->log_space_limit= relay_log_space_limit;
rli->log_space_total= 0;
- rli->tables_to_lock= 0;
- rli->tables_to_lock_count= 0;
char pattern[FN_REFLEN];
(void) my_realpath(pattern, slave_load_tmpdir, 0);
@@ -529,6 +519,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
}
rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
/*
Test to see if the previous run was with the skip of purging
@@ -878,17 +870,54 @@ improper_arguments: %d timed_out: %d",
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
- bool skip_lock)
+ rpl_group_info *rgi,
+ bool skip_lock)
{
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
if (!skip_lock)
mysql_mutex_lock(&data_lock);
- inc_event_relay_log_pos();
- group_relay_log_pos= event_relay_log_pos;
- strmake_buf(group_relay_log_name,event_relay_log_name);
+ rgi->inc_event_relay_log_pos();
+ DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
+ (long) log_pos, (long) group_master_log_pos));
+ if (rgi->is_parallel_exec)
+ {
+ /* In case of parallel replication, do not update the position backwards. */
+ int cmp= strcmp(group_relay_log_name, event_relay_log_name);
+ if (cmp < 0)
+ {
+ group_relay_log_pos= event_relay_log_pos;
+ strmake_buf(group_relay_log_name, event_relay_log_name);
+ notify_group_relay_log_name_update();
+ } else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos)
+ group_relay_log_pos= event_relay_log_pos;
- notify_group_relay_log_name_update();
+ /*
+ In the parallel case we need to update the master_log_name here, rather
+ than in Rotate_log_event::do_update_pos().
+ */
+ cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name);
+ if (cmp <= 0)
+ {
+ if (cmp < 0)
+ {
+ strcpy(group_master_log_name, rgi->future_event_master_log_name);
+ notify_group_master_log_name_update();
+ group_master_log_pos= log_pos;
+ }
+ else if (group_master_log_pos < log_pos)
+ group_master_log_pos= log_pos;
+ }
+ }
+ else
+ {
+ /* Non-parallel case. */
+ group_relay_log_pos= event_relay_log_pos;
+ strmake_buf(group_relay_log_name, event_relay_log_name);
+ notify_group_relay_log_name_update();
+ if (log_pos) // 3.23 binlogs don't have log_posx
+ group_master_log_pos= log_pos;
+ }
/*
If the slave does not support transactions and replicates a transaction,
@@ -920,12 +949,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
the relay log is not "val".
With the end_log_pos solution, we avoid computations involving lengthes.
*/
- DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
- (long) log_pos, (long) group_master_log_pos));
- if (log_pos) // 3.23 binlogs don't have log_posx
- {
- group_master_log_pos= log_pos;
- }
mysql_cond_broadcast(&data_cond);
if (!skip_lock)
mysql_mutex_unlock(&data_lock);
@@ -941,6 +964,9 @@ void Relay_log_info::close_temporary_tables()
for (table=save_temporary_tables ; table ; table=next)
{
next=table->next;
+
+ /* Reset in_use as the table may have been created by another thd */
+ table->in_use=0;
/*
Don't ask for disk deletion. For now, anyway they will be deleted when
slave restarts, but it is a better intention to not delete them.
@@ -1100,9 +1126,9 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
!replicate_same_server_id)
DBUG_RETURN(FALSE);
log_name= group_master_log_name;
- log_pos= (!ev)? group_master_log_pos :
- ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ?
- group_master_log_pos : ev->log_pos - ev->data_written);
+ log_pos= ((!ev)? group_master_log_pos :
+ (get_flag(IN_TRANSACTION) || !ev->log_pos) ?
+ group_master_log_pos : ev->log_pos - ev->data_written);
}
else
{ /* until_condition == UNTIL_RELAY_POS */
@@ -1195,19 +1221,24 @@ bool Relay_log_info::cached_charset_compare(char *charset) const
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
- time_t event_creation_time, THD *thd)
+ time_t event_creation_time, THD *thd,
+ rpl_group_info *rgi)
{
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
#endif
- clear_flag(IN_STMT);
+ DBUG_ENTER("Relay_log_info::stmt_done");
+ DBUG_ASSERT(rgi->rli == this);
/*
If in a transaction, and if the slave supports transactions, just
inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
BEGIN/COMMIT, not with SET AUTOCOMMIT= .
+ We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
+ is also used for single row transactions.
+
CAUTION: opt_using_transactions means innodb || bdb ; suppose the
master supports InnoDB and BDB, but the slave supports only BDB,
problems will arise: - suppose an InnoDB table is created on the
@@ -1225,12 +1256,13 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
- if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
- inc_event_relay_log_pos();
+ if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
+ opt_using_transactions)
+ rgi->inc_event_relay_log_pos();
else
{
- inc_group_relay_log_pos(event_master_log_pos);
- if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, this))
+ inc_group_relay_log_pos(event_master_log_pos, rgi);
+ if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi))
{
report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
"Failed to update GTID state in %s.%s, slave state may become "
@@ -1245,7 +1277,8 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
*/
}
DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
- flush_relay_log_info(this);
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ flush_relay_log_info(this);
DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
/*
Note that Rotate_log_event::do_apply_event() does not call this
@@ -1259,127 +1292,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)))
last_master_timestamp= event_creation_time;
}
-}
-
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-void Relay_log_info::cleanup_context(THD *thd, bool error)
-{
- DBUG_ENTER("Relay_log_info::cleanup_context");
-
- DBUG_ASSERT(sql_thd == thd);
- /*
- 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
- may have opened tables, which we cannot be sure have been closed (because
- maybe the Rows_log_event have not been found or will not be, because slave
- SQL thread is stopping, or relay log has a missing tail etc). So we close
- all thread's tables. And so the table mappings have to be cancelled.
- 2) Rows_log_event::do_apply_event() may even have started statements or
- transactions on them, which we need to rollback in case of error.
- 3) If finding a Format_description_log_event after a BEGIN, we also need
- to rollback before continuing with the next events.
- 4) so we need this "context cleanup" function.
- */
- if (error)
- {
- trans_rollback_stmt(thd); // if a "statement transaction"
- trans_rollback(thd); // if a "real transaction"
- }
- m_table_map.clear_tables();
- slave_close_thread_tables(thd);
- if (error)
- thd->mdl_context.release_transactional_locks();
- clear_flag(IN_STMT);
- /*
- Cleanup for the flags that have been set at do_apply_event.
- */
- thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
- thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
-
- /*
- Reset state related to long_find_row notes in the error log:
- - timestamp
- - flag that decides whether the slave prints or not
- */
- reset_row_stmt_start_timestamp();
- unset_long_find_row_note_printed();
-
- DBUG_VOID_RETURN;
-}
-
-void Relay_log_info::clear_tables_to_lock()
-{
- DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
-#ifndef DBUG_OFF
- /**
- When replicating in RBR and MyISAM Merge tables are involved
- open_and_lock_tables (called in do_apply_event) appends the
- base tables to the list of tables_to_lock. Then these are
- removed from the list in close_thread_tables (which is called
- before we reach this point).
-
- This assertion just confirms that we get no surprises at this
- point.
- */
- uint i=0;
- for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
- DBUG_ASSERT(i == tables_to_lock_count);
-#endif
-
- while (tables_to_lock)
- {
- uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
- if (tables_to_lock->m_tabledef_valid)
- {
- tables_to_lock->m_tabledef.table_def::~table_def();
- tables_to_lock->m_tabledef_valid= FALSE;
- }
-
- /*
- If blob fields were used during conversion of field values
- from the master table into the slave table, then we need to
- free the memory used temporarily to store their values before
- copying into the slave's table.
- */
- if (tables_to_lock->m_conv_table)
- free_blobs(tables_to_lock->m_conv_table);
-
- tables_to_lock=
- static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
- tables_to_lock_count--;
- my_free(to_free);
- }
- DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
DBUG_VOID_RETURN;
}
-void Relay_log_info::slave_close_thread_tables(THD *thd)
-{
- DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
- thd->get_stmt_da()->set_overwrite_status(true);
- thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
- thd->get_stmt_da()->set_overwrite_status(false);
-
- close_thread_tables(thd);
- /*
- - If inside a multi-statement transaction,
- defer the release of metadata locks until the current
- transaction is either committed or rolled back. This prevents
- other statements from modifying the table for the entire
- duration of this transaction. This provides commit ordering
- and guarantees serializability across multiple transactions.
- - If in autocommit mode, or outside a transactional context,
- automatically release metadata locks of the current statement.
- */
- if (! thd->in_multi_stmt_transaction_mode())
- thd->mdl_context.release_transactional_locks();
- else
- thd->mdl_context.release_statement_locks();
-
- clear_tables_to_lock();
- DBUG_VOID_RETURN;
-}
-
-
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
{
@@ -1555,4 +1471,224 @@ end:
DBUG_RETURN(err);
}
+
+rpl_group_info::rpl_group_info(Relay_log_info *rli_)
+ : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
+ deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
+ tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
+ is_parallel_exec(false), is_error(false),
+ row_stmt_start_timestamp(0), long_find_row_note_printed(false)
+{
+ bzero(&current_gtid, sizeof(current_gtid));
+ mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
+}
+
+
+rpl_group_info::~rpl_group_info()
+{
+ free_annotate_event();
+ mysql_mutex_destroy(&sleep_lock);
+ mysql_cond_destroy(&sleep_cond);
+}
+
+
+int
+event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
+{
+ uint64 sub_id= rpl_global_gtid_slave_state.next_sub_id(gev->domain_id);
+ if (!sub_id)
+ {
+ /* Out of memory caused hash insertion to fail. */
+ return 1;
+ }
+ rgi->gtid_sub_id= sub_id;
+ rgi->current_gtid.server_id= gev->server_id;
+ rgi->current_gtid.domain_id= gev->domain_id;
+ rgi->current_gtid.seq_no= gev->seq_no;
+ return 0;
+}
+
+
+void
+delete_or_keep_event_post_apply(rpl_group_info *rgi,
+ Log_event_type typ, Log_event *ev)
+{
+ /*
+ ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
+ thread-safe for parallel replication.
+ */
+
+ switch (typ) {
+ case FORMAT_DESCRIPTION_EVENT:
+ /*
+ Format_description_log_event should not be deleted because it
+ will be used to read info about the relay log's format;
+ it will be deleted when the SQL thread does not need it,
+ i.e. when this thread terminates.
+ */
+ break;
+ case ANNOTATE_ROWS_EVENT:
+ /*
+ Annotate_rows event should not be deleted because after it has
+ been applied, thd->query points to the string inside this event.
+ The thd->query will be used to generate new Annotate_rows event
+ during applying the subsequent Rows events.
+ */
+ rgi->set_annotate_event((Annotate_rows_log_event*) ev);
+ break;
+ case DELETE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case WRITE_ROWS_EVENT:
+ /*
+ After the last Rows event has been applied, the saved Annotate_rows
+ event (if any) is not needed anymore and can be deleted.
+ */
+ if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
+ rgi->free_annotate_event();
+ /* fall through */
+ default:
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ if (!rgi->is_deferred_event(ev))
+ delete ev;
+ break;
+ }
+}
+
+
+void rpl_group_info::cleanup_context(THD *thd, bool error)
+{
+ DBUG_ENTER("Relay_log_info::cleanup_context");
+ DBUG_PRINT("enter", ("error: %d", (int) error));
+
+ DBUG_ASSERT(this->thd == thd);
+ /*
+ 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
+ may have opened tables, which we cannot be sure have been closed (because
+ maybe the Rows_log_event have not been found or will not be, because slave
+ SQL thread is stopping, or relay log has a missing tail etc). So we close
+ all thread's tables. And so the table mappings have to be cancelled.
+ 2) Rows_log_event::do_apply_event() may even have started statements or
+ transactions on them, which we need to rollback in case of error.
+ 3) If finding a Format_description_log_event after a BEGIN, we also need
+ to rollback before continuing with the next events.
+ 4) so we need this "context cleanup" function.
+ */
+ if (error)
+ {
+ trans_rollback_stmt(thd); // if a "statement transaction"
+ trans_rollback(thd); // if a "real transaction"
+ }
+ m_table_map.clear_tables();
+ slave_close_thread_tables(thd);
+ if (error)
+ {
+ thd->mdl_context.release_transactional_locks();
+
+ if (thd == rli->sql_driver_thd)
+ {
+ /*
+ Reset flags. This is needed to handle incident events and errors in
+ the relay log noticed by the sql driver thread.
+ */
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
+ }
+ }
+
+ /*
+ Cleanup for the flags that have been set at do_apply_event.
+ */
+ thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+ thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+
+ /*
+ Reset state related to long_find_row notes in the error log:
+ - timestamp
+ - flag that decides whether the slave prints or not
+ */
+ reset_row_stmt_start_timestamp();
+ unset_long_find_row_note_printed();
+
+ DBUG_VOID_RETURN;
+}
+
+
+void rpl_group_info::clear_tables_to_lock()
+{
+ DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
+#ifndef DBUG_OFF
+ /**
+ When replicating in RBR and MyISAM Merge tables are involved
+ open_and_lock_tables (called in do_apply_event) appends the
+ base tables to the list of tables_to_lock. Then these are
+ removed from the list in close_thread_tables (which is called
+ before we reach this point).
+
+ This assertion just confirms that we get no surprises at this
+ point.
+ */
+ uint i=0;
+ for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
+ DBUG_ASSERT(i == tables_to_lock_count);
+#endif
+
+ while (tables_to_lock)
+ {
+ uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
+ if (tables_to_lock->m_tabledef_valid)
+ {
+ tables_to_lock->m_tabledef.table_def::~table_def();
+ tables_to_lock->m_tabledef_valid= FALSE;
+ }
+
+ /*
+ If blob fields were used during conversion of field values
+ from the master table into the slave table, then we need to
+ free the memory used temporarily to store their values before
+ copying into the slave's table.
+ */
+ if (tables_to_lock->m_conv_table)
+ free_blobs(tables_to_lock->m_conv_table);
+
+ tables_to_lock=
+ static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
+ tables_to_lock_count--;
+ my_free(to_free);
+ }
+ DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
+ DBUG_VOID_RETURN;
+}
+
+
+void rpl_group_info::slave_close_thread_tables(THD *thd)
+{
+ DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
+ thd->get_stmt_da()->set_overwrite_status(true);
+ thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
+ thd->get_stmt_da()->set_overwrite_status(false);
+
+ close_thread_tables(thd);
+ /*
+ - If inside a multi-statement transaction,
+ defer the release of metadata locks until the current
+ transaction is either committed or rolled back. This prevents
+ other statements from modifying the table for the entire
+ duration of this transaction. This provides commit ordering
+ and guarantees serializability across multiple transactions.
+ - If in autocommit mode, or outside a transactional context,
+ automatically release metadata locks of the current statement.
+ */
+ if (! thd->in_multi_stmt_transaction_mode())
+ thd->mdl_context.release_transactional_locks();
+ else
+ thd->mdl_context.release_statement_locks();
+
+ clear_tables_to_lock();
+ DBUG_VOID_RETURN;
+}
+
+
#endif