summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-11-01 09:17:06 +0100
committerunknown <knielsen@knielsen-hq.org>2013-11-01 09:17:06 +0100
commitcb86ce60b9bade5ae7712d8f3f74668208ee3fd2 (patch)
treedaff81c02baa6c2581d6abe3d746b8f35ee44f32 /sql/rpl_rli.cc
parentf4d5d849fd3b526d38ca6eb083fd0b290eb0eda7 (diff)
parent39df665a3332bd9bfb2529419f534a49cfac388c (diff)
downloadmariadb-git-cb86ce60b9bade5ae7712d8f3f74668208ee3fd2.tar.gz
Merge MDEV-4506: Parallel replication into 10.0-base.
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc438
1 files changed, 287 insertions, 151 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index dd2a151108d..cc265f25c9d 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -56,13 +56,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
#endif
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), slave_skip_counter(0),
- abort_pos_wait(0), slave_run_id(0), sql_thd(0),
+ abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
- gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
- last_event_start_time(0), deferred_events(NULL),m_flags(0),
- row_stmt_start_timestamp(0), long_find_row_note_printed(false),
- m_annotate_event(0)
+ m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -87,12 +84,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
&data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_log_space_lock,
&log_space_lock, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
- mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@@ -105,14 +100,11 @@ Relay_log_info::~Relay_log_info()
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
- mysql_mutex_destroy(&sleep_lock);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
- mysql_cond_destroy(&sleep_cond);
relay_log.cleanup();
- free_annotate_event();
DBUG_VOID_RETURN;
}
@@ -137,8 +129,6 @@ int init_relay_log_info(Relay_log_info* rli,
rli->abort_pos_wait=0;
rli->log_space_limit= relay_log_space_limit;
rli->log_space_total= 0;
- rli->tables_to_lock= 0;
- rli->tables_to_lock_count= 0;
char pattern[FN_REFLEN];
(void) my_realpath(pattern, slave_load_tmpdir, 0);
@@ -528,6 +518,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
}
rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
/*
Test to see if the previous run was with the skip of purging
@@ -877,17 +869,54 @@ improper_arguments: %d timed_out: %d",
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
- bool skip_lock)
+ rpl_group_info *rgi,
+ bool skip_lock)
{
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
if (!skip_lock)
mysql_mutex_lock(&data_lock);
- inc_event_relay_log_pos();
- group_relay_log_pos= event_relay_log_pos;
- strmake_buf(group_relay_log_name,event_relay_log_name);
+ rgi->inc_event_relay_log_pos();
+ DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
+ (long) log_pos, (long) group_master_log_pos));
+ if (rgi->is_parallel_exec)
+ {
+ /* In case of parallel replication, do not update the position backwards. */
+ int cmp= strcmp(group_relay_log_name, event_relay_log_name);
+ if (cmp < 0)
+ {
+ group_relay_log_pos= event_relay_log_pos;
+ strmake_buf(group_relay_log_name, event_relay_log_name);
+ notify_group_relay_log_name_update();
+ } else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos)
+ group_relay_log_pos= event_relay_log_pos;
- notify_group_relay_log_name_update();
+ /*
+ In the parallel case we need to update the master_log_name here, rather
+ than in Rotate_log_event::do_update_pos().
+ */
+ cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name);
+ if (cmp <= 0)
+ {
+ if (cmp < 0)
+ {
+ strcpy(group_master_log_name, rgi->future_event_master_log_name);
+ notify_group_master_log_name_update();
+ group_master_log_pos= log_pos;
+ }
+ else if (group_master_log_pos < log_pos)
+ group_master_log_pos= log_pos;
+ }
+ }
+ else
+ {
+ /* Non-parallel case. */
+ group_relay_log_pos= event_relay_log_pos;
+ strmake_buf(group_relay_log_name, event_relay_log_name);
+ notify_group_relay_log_name_update();
+ if (log_pos) // 3.23 binlogs don't have log_posx
+ group_master_log_pos= log_pos;
+ }
/*
If the slave does not support transactions and replicates a transaction,
@@ -919,12 +948,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
the relay log is not "val".
With the end_log_pos solution, we avoid computations involving lengthes.
*/
- DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
- (long) log_pos, (long) group_master_log_pos));
- if (log_pos) // 3.23 binlogs don't have log_posx
- {
- group_master_log_pos= log_pos;
- }
mysql_cond_broadcast(&data_cond);
if (!skip_lock)
mysql_mutex_unlock(&data_lock);
@@ -940,6 +963,9 @@ void Relay_log_info::close_temporary_tables()
for (table=save_temporary_tables ; table ; table=next)
{
next=table->next;
+
+ /* Reset in_use as the table may have been created by another thd */
+ table->in_use=0;
/*
Don't ask for disk deletion. For now, anyway they will be deleted when
slave restarts, but it is a better intention to not delete them.
@@ -1099,9 +1125,9 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
!replicate_same_server_id)
DBUG_RETURN(FALSE);
log_name= group_master_log_name;
- log_pos= (!ev)? group_master_log_pos :
- ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ?
- group_master_log_pos : ev->log_pos - ev->data_written);
+ log_pos= ((!ev)? group_master_log_pos :
+ (get_flag(IN_TRANSACTION) || !ev->log_pos) ?
+ group_master_log_pos : ev->log_pos - ev->data_written);
}
else
{ /* until_condition == UNTIL_RELAY_POS */
@@ -1194,19 +1220,24 @@ bool Relay_log_info::cached_charset_compare(char *charset) const
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
- time_t event_creation_time, THD *thd)
+ time_t event_creation_time, THD *thd,
+ rpl_group_info *rgi)
{
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
#endif
- clear_flag(IN_STMT);
+ DBUG_ENTER("Relay_log_info::stmt_done");
+ DBUG_ASSERT(rgi->rli == this);
/*
If in a transaction, and if the slave supports transactions, just
inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
BEGIN/COMMIT, not with SET AUTOCOMMIT= .
+ We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
+ is also used for single row transactions.
+
CAUTION: opt_using_transactions means innodb || bdb ; suppose the
master supports InnoDB and BDB, but the slave supports only BDB,
problems will arise: - suppose an InnoDB table is created on the
@@ -1224,12 +1255,13 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
- if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
- inc_event_relay_log_pos();
+ if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
+ opt_using_transactions)
+ rgi->inc_event_relay_log_pos();
else
{
- inc_group_relay_log_pos(event_master_log_pos);
- if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, this))
+ inc_group_relay_log_pos(event_master_log_pos, rgi);
+ if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi))
{
report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
"Failed to update GTID state in %s.%s, slave state may become "
@@ -1244,7 +1276,8 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
*/
}
DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
- flush_relay_log_info(this);
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ flush_relay_log_info(this);
DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
/*
Note that Rotate_log_event::do_apply_event() does not call this
@@ -1258,127 +1291,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)))
last_master_timestamp= event_creation_time;
}
-}
-
-#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-void Relay_log_info::cleanup_context(THD *thd, bool error)
-{
- DBUG_ENTER("Relay_log_info::cleanup_context");
-
- DBUG_ASSERT(sql_thd == thd);
- /*
- 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
- may have opened tables, which we cannot be sure have been closed (because
- maybe the Rows_log_event have not been found or will not be, because slave
- SQL thread is stopping, or relay log has a missing tail etc). So we close
- all thread's tables. And so the table mappings have to be cancelled.
- 2) Rows_log_event::do_apply_event() may even have started statements or
- transactions on them, which we need to rollback in case of error.
- 3) If finding a Format_description_log_event after a BEGIN, we also need
- to rollback before continuing with the next events.
- 4) so we need this "context cleanup" function.
- */
- if (error)
- {
- trans_rollback_stmt(thd); // if a "statement transaction"
- trans_rollback(thd); // if a "real transaction"
- }
- m_table_map.clear_tables();
- slave_close_thread_tables(thd);
- if (error)
- thd->mdl_context.release_transactional_locks();
- clear_flag(IN_STMT);
- /*
- Cleanup for the flags that have been set at do_apply_event.
- */
- thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
- thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
-
- /*
- Reset state related to long_find_row notes in the error log:
- - timestamp
- - flag that decides whether the slave prints or not
- */
- reset_row_stmt_start_timestamp();
- unset_long_find_row_note_printed();
-
- DBUG_VOID_RETURN;
-}
-
-void Relay_log_info::clear_tables_to_lock()
-{
- DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
-#ifndef DBUG_OFF
- /**
- When replicating in RBR and MyISAM Merge tables are involved
- open_and_lock_tables (called in do_apply_event) appends the
- base tables to the list of tables_to_lock. Then these are
- removed from the list in close_thread_tables (which is called
- before we reach this point).
-
- This assertion just confirms that we get no surprises at this
- point.
- */
- uint i=0;
- for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
- DBUG_ASSERT(i == tables_to_lock_count);
-#endif
-
- while (tables_to_lock)
- {
- uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
- if (tables_to_lock->m_tabledef_valid)
- {
- tables_to_lock->m_tabledef.table_def::~table_def();
- tables_to_lock->m_tabledef_valid= FALSE;
- }
-
- /*
- If blob fields were used during conversion of field values
- from the master table into the slave table, then we need to
- free the memory used temporarily to store their values before
- copying into the slave's table.
- */
- if (tables_to_lock->m_conv_table)
- free_blobs(tables_to_lock->m_conv_table);
-
- tables_to_lock=
- static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
- tables_to_lock_count--;
- my_free(to_free);
- }
- DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
DBUG_VOID_RETURN;
}
-void Relay_log_info::slave_close_thread_tables(THD *thd)
-{
- DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
- thd->stmt_da->can_overwrite_status= TRUE;
- thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
- thd->stmt_da->can_overwrite_status= FALSE;
-
- close_thread_tables(thd);
- /*
- - If inside a multi-statement transaction,
- defer the release of metadata locks until the current
- transaction is either committed or rolled back. This prevents
- other statements from modifying the table for the entire
- duration of this transaction. This provides commit ordering
- and guarantees serializability across multiple transactions.
- - If in autocommit mode, or outside a transactional context,
- automatically release metadata locks of the current statement.
- */
- if (! thd->in_multi_stmt_transaction_mode())
- thd->mdl_context.release_transactional_locks();
- else
- thd->mdl_context.release_statement_locks();
-
- clear_tables_to_lock();
- DBUG_VOID_RETURN;
-}
-
-
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
{
@@ -1554,4 +1470,224 @@ end:
DBUG_RETURN(err);
}
+
+rpl_group_info::rpl_group_info(Relay_log_info *rli_)
+ : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
+ deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
+ tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
+ is_parallel_exec(false), is_error(false),
+ row_stmt_start_timestamp(0), long_find_row_note_printed(false)
+{
+ bzero(&current_gtid, sizeof(current_gtid));
+ mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
+}
+
+
+rpl_group_info::~rpl_group_info()
+{
+ free_annotate_event();
+ mysql_mutex_destroy(&sleep_lock);
+ mysql_cond_destroy(&sleep_cond);
+}
+
+
+int
+event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
+{
+ uint64 sub_id= rpl_global_gtid_slave_state.next_sub_id(gev->domain_id);
+ if (!sub_id)
+ {
+ /* Out of memory caused hash insertion to fail. */
+ return 1;
+ }
+ rgi->gtid_sub_id= sub_id;
+ rgi->current_gtid.server_id= gev->server_id;
+ rgi->current_gtid.domain_id= gev->domain_id;
+ rgi->current_gtid.seq_no= gev->seq_no;
+ return 0;
+}
+
+
+void
+delete_or_keep_event_post_apply(rpl_group_info *rgi,
+ Log_event_type typ, Log_event *ev)
+{
+ /*
+ ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
+ thread-safe for parallel replication.
+ */
+
+ switch (typ) {
+ case FORMAT_DESCRIPTION_EVENT:
+ /*
+ Format_description_log_event should not be deleted because it
+ will be used to read info about the relay log's format;
+ it will be deleted when the SQL thread does not need it,
+ i.e. when this thread terminates.
+ */
+ break;
+ case ANNOTATE_ROWS_EVENT:
+ /*
+ Annotate_rows event should not be deleted because after it has
+ been applied, thd->query points to the string inside this event.
+ The thd->query will be used to generate new Annotate_rows event
+ during applying the subsequent Rows events.
+ */
+ rgi->set_annotate_event((Annotate_rows_log_event*) ev);
+ break;
+ case DELETE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case WRITE_ROWS_EVENT:
+ /*
+ After the last Rows event has been applied, the saved Annotate_rows
+ event (if any) is not needed anymore and can be deleted.
+ */
+ if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
+ rgi->free_annotate_event();
+ /* fall through */
+ default:
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ if (!rgi->is_deferred_event(ev))
+ delete ev;
+ break;
+ }
+}
+
+
+void rpl_group_info::cleanup_context(THD *thd, bool error)
+{
+ DBUG_ENTER("Relay_log_info::cleanup_context");
+ DBUG_PRINT("enter", ("error: %d", (int) error));
+
+ DBUG_ASSERT(this->thd == thd);
+ /*
+ 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
+ may have opened tables, which we cannot be sure have been closed (because
+ maybe the Rows_log_event have not been found or will not be, because slave
+ SQL thread is stopping, or relay log has a missing tail etc). So we close
+ all thread's tables. And so the table mappings have to be cancelled.
+ 2) Rows_log_event::do_apply_event() may even have started statements or
+ transactions on them, which we need to rollback in case of error.
+ 3) If finding a Format_description_log_event after a BEGIN, we also need
+ to rollback before continuing with the next events.
+ 4) so we need this "context cleanup" function.
+ */
+ if (error)
+ {
+ trans_rollback_stmt(thd); // if a "statement transaction"
+ trans_rollback(thd); // if a "real transaction"
+ }
+ m_table_map.clear_tables();
+ slave_close_thread_tables(thd);
+ if (error)
+ {
+ thd->mdl_context.release_transactional_locks();
+
+ if (thd == rli->sql_driver_thd)
+ {
+ /*
+ Reset flags. This is needed to handle incident events and errors in
+ the relay log noticed by the sql driver thread.
+ */
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
+ }
+ }
+
+ /*
+ Cleanup for the flags that have been set at do_apply_event.
+ */
+ thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+ thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+
+ /*
+ Reset state related to long_find_row notes in the error log:
+ - timestamp
+ - flag that decides whether the slave prints or not
+ */
+ reset_row_stmt_start_timestamp();
+ unset_long_find_row_note_printed();
+
+ DBUG_VOID_RETURN;
+}
+
+
+void rpl_group_info::clear_tables_to_lock()
+{
+ DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
+#ifndef DBUG_OFF
+ /**
+ When replicating in RBR and MyISAM Merge tables are involved
+ open_and_lock_tables (called in do_apply_event) appends the
+ base tables to the list of tables_to_lock. Then these are
+ removed from the list in close_thread_tables (which is called
+ before we reach this point).
+
+ This assertion just confirms that we get no surprises at this
+ point.
+ */
+ uint i=0;
+ for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ;
+ DBUG_ASSERT(i == tables_to_lock_count);
+#endif
+
+ while (tables_to_lock)
+ {
+ uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
+ if (tables_to_lock->m_tabledef_valid)
+ {
+ tables_to_lock->m_tabledef.table_def::~table_def();
+ tables_to_lock->m_tabledef_valid= FALSE;
+ }
+
+ /*
+ If blob fields were used during conversion of field values
+ from the master table into the slave table, then we need to
+ free the memory used temporarily to store their values before
+ copying into the slave's table.
+ */
+ if (tables_to_lock->m_conv_table)
+ free_blobs(tables_to_lock->m_conv_table);
+
+ tables_to_lock=
+ static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
+ tables_to_lock_count--;
+ my_free(to_free);
+ }
+ DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
+ DBUG_VOID_RETURN;
+}
+
+
+void rpl_group_info::slave_close_thread_tables(THD *thd)
+{
+ DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
+ thd->stmt_da->can_overwrite_status= TRUE;
+ thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
+ thd->stmt_da->can_overwrite_status= FALSE;
+
+ close_thread_tables(thd);
+ /*
+ - If inside a multi-statement transaction,
+ defer the release of metadata locks until the current
+ transaction is either committed or rolled back. This prevents
+ other statements from modifying the table for the entire
+ duration of this transaction. This provides commit ordering
+ and guarantees serializability across multiple transactions.
+ - If in autocommit mode, or outside a transactional context,
+ automatically release metadata locks of the current statement.
+ */
+ if (! thd->in_multi_stmt_transaction_mode())
+ thd->mdl_context.release_transactional_locks();
+ else
+ thd->mdl_context.release_statement_locks();
+
+ clear_tables_to_lock();
+ DBUG_VOID_RETURN;
+}
+
+
#endif