diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-11-01 09:17:06 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-11-01 09:17:06 +0100 |
commit | cb86ce60b9bade5ae7712d8f3f74668208ee3fd2 (patch) | |
tree | daff81c02baa6c2581d6abe3d746b8f35ee44f32 /sql/rpl_rli.cc | |
parent | f4d5d849fd3b526d38ca6eb083fd0b290eb0eda7 (diff) | |
parent | 39df665a3332bd9bfb2529419f534a49cfac388c (diff) | |
download | mariadb-git-cb86ce60b9bade5ae7712d8f3f74668208ee3fd2.tar.gz |
Merge MDEV-4506: Parallel replication into 10.0-base.
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 438 |
1 files changed, 287 insertions, 151 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index dd2a151108d..cc265f25c9d 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -56,13 +56,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) #endif group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0), - abort_pos_wait(0), slave_run_id(0), sql_thd(0), + abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), - gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0), - last_event_start_time(0), deferred_events(NULL),m_flags(0), - row_stmt_start_timestamp(0), long_find_row_note_printed(false), - m_annotate_event(0) + m_flags(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); @@ -87,12 +84,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) &data_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_relay_log_info_log_space_lock, &log_space_lock, MY_MUTEX_INIT_FAST); - mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL); mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); - mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL); relay_log.init_pthread_objects(); DBUG_VOID_RETURN; } @@ -105,14 +100,11 @@ Relay_log_info::~Relay_log_info() mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&log_space_lock); - mysql_mutex_destroy(&sleep_lock); mysql_cond_destroy(&data_cond); mysql_cond_destroy(&start_cond); mysql_cond_destroy(&stop_cond); mysql_cond_destroy(&log_space_cond); - mysql_cond_destroy(&sleep_cond); relay_log.cleanup(); - free_annotate_event(); DBUG_VOID_RETURN; } @@ -137,8 +129,6 @@ int init_relay_log_info(Relay_log_info* rli, rli->abort_pos_wait=0; rli->log_space_limit= relay_log_space_limit; rli->log_space_total= 0; - rli->tables_to_lock= 0; - rli->tables_to_lock_count= 0; char pattern[FN_REFLEN]; (void) my_realpath(pattern, slave_load_tmpdir, 0); @@ -528,6 +518,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log, } rli->group_relay_log_pos = rli->event_relay_log_pos = pos; + rli->clear_flag(Relay_log_info::IN_STMT); + rli->clear_flag(Relay_log_info::IN_TRANSACTION); /* Test to see if the previous run was with the skip of purging @@ -877,17 +869,54 @@ improper_arguments: %d timed_out: %d", void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, - bool skip_lock) + rpl_group_info *rgi, + bool skip_lock) { DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos"); if (!skip_lock) mysql_mutex_lock(&data_lock); - inc_event_relay_log_pos(); - group_relay_log_pos= event_relay_log_pos; - strmake_buf(group_relay_log_name,event_relay_log_name); + rgi->inc_event_relay_log_pos(); + DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", + (long) log_pos, (long) group_master_log_pos)); + if (rgi->is_parallel_exec) + { + /* In case of parallel replication, do not update the position backwards. */ + int cmp= strcmp(group_relay_log_name, event_relay_log_name); + if (cmp < 0) + { + group_relay_log_pos= event_relay_log_pos; + strmake_buf(group_relay_log_name, event_relay_log_name); + notify_group_relay_log_name_update(); + } else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos) + group_relay_log_pos= event_relay_log_pos; - notify_group_relay_log_name_update(); + /* + In the parallel case we need to update the master_log_name here, rather + than in Rotate_log_event::do_update_pos(). + */ + cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name); + if (cmp <= 0) + { + if (cmp < 0) + { + strcpy(group_master_log_name, rgi->future_event_master_log_name); + notify_group_master_log_name_update(); + group_master_log_pos= log_pos; + } + else if (group_master_log_pos < log_pos) + group_master_log_pos= log_pos; + } + } + else + { + /* Non-parallel case. */ + group_relay_log_pos= event_relay_log_pos; + strmake_buf(group_relay_log_name, event_relay_log_name); + notify_group_relay_log_name_update(); + if (log_pos) // 3.23 binlogs don't have log_posx + group_master_log_pos= log_pos; + } /* If the slave does not support transactions and replicates a transaction, @@ -919,12 +948,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, the relay log is not "val". With the end_log_pos solution, we avoid computations involving lengthes. */ - DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", - (long) log_pos, (long) group_master_log_pos)); - if (log_pos) // 3.23 binlogs don't have log_posx - { - group_master_log_pos= log_pos; - } mysql_cond_broadcast(&data_cond); if (!skip_lock) mysql_mutex_unlock(&data_lock); @@ -940,6 +963,9 @@ void Relay_log_info::close_temporary_tables() for (table=save_temporary_tables ; table ; table=next) { next=table->next; + + /* Reset in_use as the table may have been created by another thd */ + table->in_use=0; /* Don't ask for disk deletion. For now, anyway they will be deleted when slave restarts, but it is a better intention to not delete them. @@ -1099,9 +1125,9 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) !replicate_same_server_id) DBUG_RETURN(FALSE); log_name= group_master_log_name; - log_pos= (!ev)? group_master_log_pos : - ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ? - group_master_log_pos : ev->log_pos - ev->data_written); + log_pos= ((!ev)? group_master_log_pos : + (get_flag(IN_TRANSACTION) || !ev->log_pos) ? + group_master_log_pos : ev->log_pos - ev->data_written); } else { /* until_condition == UNTIL_RELAY_POS */ @@ -1194,19 +1220,24 @@ bool Relay_log_info::cached_charset_compare(char *charset) const void Relay_log_info::stmt_done(my_off_t event_master_log_pos, - time_t event_creation_time, THD *thd) + time_t event_creation_time, THD *thd, + rpl_group_info *rgi) { #ifndef DBUG_OFF extern uint debug_not_change_ts_if_art_event; #endif - clear_flag(IN_STMT); + DBUG_ENTER("Relay_log_info::stmt_done"); + DBUG_ASSERT(rgi->rli == this); /* If in a transaction, and if the slave supports transactions, just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with BEGIN/COMMIT, not with SET AUTOCOMMIT= . + We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN + is also used for single row transactions. + CAUTION: opt_using_transactions means innodb || bdb ; suppose the master supports InnoDB and BDB, but the slave supports only BDB, problems will arise: - suppose an InnoDB table is created on the @@ -1224,12 +1255,13 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, middle of the "transaction". START SLAVE will resume at BEGIN while the MyISAM table has already been updated. */ - if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) - inc_event_relay_log_pos(); + if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && + opt_using_transactions) + rgi->inc_event_relay_log_pos(); else { - inc_group_relay_log_pos(event_master_log_pos); - if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, this)) + inc_group_relay_log_pos(event_master_log_pos, rgi); + if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi)) { report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, "Failed to update GTID state in %s.%s, slave state may become " @@ -1244,7 +1276,8 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, */ } DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); - flush_relay_log_info(this); + if (mi->using_gtid == Master_info::USE_GTID_NO) + flush_relay_log_info(this); DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); /* Note that Rotate_log_event::do_apply_event() does not call this @@ -1258,127 +1291,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, IF_DBUG(debug_not_change_ts_if_art_event > 0, 1))) last_master_timestamp= event_creation_time; } -} - -#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -void Relay_log_info::cleanup_context(THD *thd, bool error) -{ - DBUG_ENTER("Relay_log_info::cleanup_context"); - - DBUG_ASSERT(sql_thd == thd); - /* - 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, - may have opened tables, which we cannot be sure have been closed (because - maybe the Rows_log_event have not been found or will not be, because slave - SQL thread is stopping, or relay log has a missing tail etc). So we close - all thread's tables. And so the table mappings have to be cancelled. - 2) Rows_log_event::do_apply_event() may even have started statements or - transactions on them, which we need to rollback in case of error. - 3) If finding a Format_description_log_event after a BEGIN, we also need - to rollback before continuing with the next events. - 4) so we need this "context cleanup" function. - */ - if (error) - { - trans_rollback_stmt(thd); // if a "statement transaction" - trans_rollback(thd); // if a "real transaction" - } - m_table_map.clear_tables(); - slave_close_thread_tables(thd); - if (error) - thd->mdl_context.release_transactional_locks(); - clear_flag(IN_STMT); - /* - Cleanup for the flags that have been set at do_apply_event. - */ - thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; - thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; - - /* - Reset state related to long_find_row notes in the error log: - - timestamp - - flag that decides whether the slave prints or not - */ - reset_row_stmt_start_timestamp(); - unset_long_find_row_note_printed(); - - DBUG_VOID_RETURN; -} - -void Relay_log_info::clear_tables_to_lock() -{ - DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); -#ifndef DBUG_OFF - /** - When replicating in RBR and MyISAM Merge tables are involved - open_and_lock_tables (called in do_apply_event) appends the - base tables to the list of tables_to_lock. Then these are - removed from the list in close_thread_tables (which is called - before we reach this point). - - This assertion just confirms that we get no surprises at this - point. - */ - uint i=0; - for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ; - DBUG_ASSERT(i == tables_to_lock_count); -#endif - - while (tables_to_lock) - { - uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock); - if (tables_to_lock->m_tabledef_valid) - { - tables_to_lock->m_tabledef.table_def::~table_def(); - tables_to_lock->m_tabledef_valid= FALSE; - } - - /* - If blob fields were used during conversion of field values - from the master table into the slave table, then we need to - free the memory used temporarily to store their values before - copying into the slave's table. - */ - if (tables_to_lock->m_conv_table) - free_blobs(tables_to_lock->m_conv_table); - - tables_to_lock= - static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global); - tables_to_lock_count--; - my_free(to_free); - } - DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0); DBUG_VOID_RETURN; } -void Relay_log_info::slave_close_thread_tables(THD *thd) -{ - DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); - thd->stmt_da->can_overwrite_status= TRUE; - thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); - thd->stmt_da->can_overwrite_status= FALSE; - - close_thread_tables(thd); - /* - - If inside a multi-statement transaction, - defer the release of metadata locks until the current - transaction is either committed or rolled back. This prevents - other statements from modifying the table for the entire - duration of this transaction. This provides commit ordering - and guarantees serializability across multiple transactions. - - If in autocommit mode, or outside a transactional context, - automatically release metadata locks of the current statement. - */ - if (! thd->in_multi_stmt_transaction_mode()) - thd->mdl_context.release_transactional_locks(); - else - thd->mdl_context.release_statement_locks(); - - clear_tables_to_lock(); - DBUG_VOID_RETURN; -} - - +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int rpl_load_gtid_slave_state(THD *thd) { @@ -1554,4 +1470,224 @@ end: DBUG_RETURN(err); } + +rpl_group_info::rpl_group_info(Relay_log_info *rli_) + : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), + wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), + deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), + tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), + is_parallel_exec(false), is_error(false), + row_stmt_start_timestamp(0), long_find_row_note_printed(false) +{ + bzero(¤t_gtid, sizeof(current_gtid)); + mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL); +} + + +rpl_group_info::~rpl_group_info() +{ + free_annotate_event(); + mysql_mutex_destroy(&sleep_lock); + mysql_cond_destroy(&sleep_cond); +} + + +int +event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) +{ + uint64 sub_id= rpl_global_gtid_slave_state.next_sub_id(gev->domain_id); + if (!sub_id) + { + /* Out of memory caused hash insertion to fail. */ + return 1; + } + rgi->gtid_sub_id= sub_id; + rgi->current_gtid.server_id= gev->server_id; + rgi->current_gtid.domain_id= gev->domain_id; + rgi->current_gtid.seq_no= gev->seq_no; + return 0; +} + + +void +delete_or_keep_event_post_apply(rpl_group_info *rgi, + Log_event_type typ, Log_event *ev) +{ + /* + ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be + thread-safe for parallel replication. + */ + + switch (typ) { + case FORMAT_DESCRIPTION_EVENT: + /* + Format_description_log_event should not be deleted because it + will be used to read info about the relay log's format; + it will be deleted when the SQL thread does not need it, + i.e. when this thread terminates. + */ + break; + case ANNOTATE_ROWS_EVENT: + /* + Annotate_rows event should not be deleted because after it has + been applied, thd->query points to the string inside this event. + The thd->query will be used to generate new Annotate_rows event + during applying the subsequent Rows events. + */ + rgi->set_annotate_event((Annotate_rows_log_event*) ev); + break; + case DELETE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case WRITE_ROWS_EVENT: + /* + After the last Rows event has been applied, the saved Annotate_rows + event (if any) is not needed anymore and can be deleted. + */ + if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) + rgi->free_annotate_event(); + /* fall through */ + default: + DBUG_PRINT("info", ("Deleting the event after it has been executed")); + if (!rgi->is_deferred_event(ev)) + delete ev; + break; + } +} + + +void rpl_group_info::cleanup_context(THD *thd, bool error) +{ + DBUG_ENTER("Relay_log_info::cleanup_context"); + DBUG_PRINT("enter", ("error: %d", (int) error)); + + DBUG_ASSERT(this->thd == thd); + /* + 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, + may have opened tables, which we cannot be sure have been closed (because + maybe the Rows_log_event have not been found or will not be, because slave + SQL thread is stopping, or relay log has a missing tail etc). So we close + all thread's tables. And so the table mappings have to be cancelled. + 2) Rows_log_event::do_apply_event() may even have started statements or + transactions on them, which we need to rollback in case of error. + 3) If finding a Format_description_log_event after a BEGIN, we also need + to rollback before continuing with the next events. + 4) so we need this "context cleanup" function. + */ + if (error) + { + trans_rollback_stmt(thd); // if a "statement transaction" + trans_rollback(thd); // if a "real transaction" + } + m_table_map.clear_tables(); + slave_close_thread_tables(thd); + if (error) + { + thd->mdl_context.release_transactional_locks(); + + if (thd == rli->sql_driver_thd) + { + /* + Reset flags. This is needed to handle incident events and errors in + the relay log noticed by the sql driver thread. + */ + rli->clear_flag(Relay_log_info::IN_STMT); + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + } + } + + /* + Cleanup for the flags that have been set at do_apply_event. + */ + thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; + + /* + Reset state related to long_find_row notes in the error log: + - timestamp + - flag that decides whether the slave prints or not + */ + reset_row_stmt_start_timestamp(); + unset_long_find_row_note_printed(); + + DBUG_VOID_RETURN; +} + + +void rpl_group_info::clear_tables_to_lock() +{ + DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); +#ifndef DBUG_OFF + /** + When replicating in RBR and MyISAM Merge tables are involved + open_and_lock_tables (called in do_apply_event) appends the + base tables to the list of tables_to_lock. Then these are + removed from the list in close_thread_tables (which is called + before we reach this point). + + This assertion just confirms that we get no surprises at this + point. + */ + uint i=0; + for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ; + DBUG_ASSERT(i == tables_to_lock_count); +#endif + + while (tables_to_lock) + { + uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock); + if (tables_to_lock->m_tabledef_valid) + { + tables_to_lock->m_tabledef.table_def::~table_def(); + tables_to_lock->m_tabledef_valid= FALSE; + } + + /* + If blob fields were used during conversion of field values + from the master table into the slave table, then we need to + free the memory used temporarily to store their values before + copying into the slave's table. + */ + if (tables_to_lock->m_conv_table) + free_blobs(tables_to_lock->m_conv_table); + + tables_to_lock= + static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global); + tables_to_lock_count--; + my_free(to_free); + } + DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0); + DBUG_VOID_RETURN; +} + + +void rpl_group_info::slave_close_thread_tables(THD *thd) +{ + DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); + thd->stmt_da->can_overwrite_status= TRUE; + thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); + thd->stmt_da->can_overwrite_status= FALSE; + + close_thread_tables(thd); + /* + - If inside a multi-statement transaction, + defer the release of metadata locks until the current + transaction is either committed or rolled back. This prevents + other statements from modifying the table for the entire + duration of this transaction. This provides commit ordering + and guarantees serializability across multiple transactions. + - If in autocommit mode, or outside a transactional context, + automatically release metadata locks of the current statement. + */ + if (! thd->in_multi_stmt_transaction_mode()) + thd->mdl_context.release_transactional_locks(); + else + thd->mdl_context.release_statement_locks(); + + clear_tables_to_lock(); + DBUG_VOID_RETURN; +} + + #endif |