diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-12-05 16:09:48 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-12-06 08:49:50 +0100 |
commit | db21fddc3740dfa48f3443751c48282467afac5e (patch) | |
tree | 3ed31a4a5ce9abf3fe98cdd9c9bdf68ba8b5832b /sql/log_event.cc | |
parent | 1e3f09f1638e2bdec6029f6c98317d17d7ca76d1 (diff) | |
download | mariadb-git-db21fddc3740dfa48f3443751c48282467afac5e.tar.gz |
MDEV-6676: Optimistic parallel replication
Implement a new mode for parallel replication. In this mode, all transactions
are optimistically attempted applied in parallel. In case of conflicts, the
offending transaction is rolled back and retried later non-parallel.
This is an early-release patch to facilitate testing, more changes to user
interface / options will be expected. The new mode is not enabled by default.
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 56 |
1 files changed, 48 insertions, 8 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 66b577ff0bb..5c42da4b743 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -187,6 +187,8 @@ is_parallel_retry_error(rpl_group_info *rgi, int err) { if (!rgi->is_parallel_exec) return false; + if (rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC) + return true; if (rgi->killed_for_retry && (err == ER_QUERY_INTERRUPTED || err == ER_CONNECTION_KILLED)) return true; @@ -6382,6 +6384,18 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0)) { cache_type= Log_event::EVENT_NO_CACHE; + if (thd_arg->transaction.stmt.trans_did_wait() || + thd_arg->transaction.all.trans_did_wait()) + flags2|= FL_WAITED; + if (sql_command_flags[thd->lex->sql_command] & CF_DISALLOW_IN_RO_TRANS) + flags2|= FL_DDL; + else if (is_transactional) + flags2|= FL_TRANSACTIONAL; + if (thd_arg->variables.option_bits & OPTION_RPL_ALLOW_PARALLEL) + flags2|= FL_ALLOW_PARALLEL; + /* Preserve any DDL or WAITED flag in the slave's binlog. */ + if (thd_arg->rgi_slave) + flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED)); } @@ -6501,9 +6515,11 @@ static char gtid_begin_string[] = "BEGIN"; int Gtid_log_event::do_apply_event(rpl_group_info *rgi) { + ulonglong bits= thd->variables.option_bits; thd->variables.server_id= this->server_id; thd->variables.gtid_domain_id= this->domain_id; thd->variables.gtid_seq_no= this->seq_no; + rgi->gtid_ev_flags2= flags2; mysql_reset_thd_for_next_command(thd); if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates) @@ -6513,12 +6529,17 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) return 1; } - DBUG_ASSERT((thd->variables.option_bits & OPTION_GTID_BEGIN) == 0); + DBUG_ASSERT((bits & OPTION_GTID_BEGIN) == 0); if (flags2 & FL_STANDALONE) return 0; /* Execute this like a BEGIN query event. */ - thd->variables.option_bits|= OPTION_GTID_BEGIN; + bits|= OPTION_GTID_BEGIN; + if (flags2 & FL_ALLOW_PARALLEL) + bits|= (ulonglong)OPTION_RPL_ALLOW_PARALLEL; + else + bits&= ~(ulonglong)OPTION_RPL_ALLOW_PARALLEL; + thd->variables.option_bits= bits; DBUG_PRINT("info", ("Set OPTION_GTID_BEGIN")); thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1, &my_charset_bin, next_query_id()); @@ -6590,14 +6611,29 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) { print_header(&cache, print_event_info, FALSE); longlong10_to_str(seq_no, buf, 10); + my_b_printf(&cache, "\tGTID %u-%u-%s", domain_id, server_id, buf); if (flags2 & FL_GROUP_COMMIT_ID) { longlong10_to_str(commit_id, buf2, 10); - my_b_printf(&cache, "\tGTID %u-%u-%s cid=%s\n", - domain_id, server_id, buf, buf2); + my_b_printf(&cache, " cid=%s", buf2); + } + if (flags2 & FL_DDL) + my_b_write_string(&cache, " ddl"); + if (flags2 & FL_TRANSACTIONAL) + my_b_write_string(&cache, " trans"); + if (flags2 & FL_WAITED) + my_b_write_string(&cache, " waited"); + my_b_printf(&cache, "\n"); + + if (!print_event_info->allow_parallel_printed || + print_event_info->allow_parallel != !!(flags2 & FL_ALLOW_PARALLEL)) + { + my_b_printf(&cache, + "/*!100101 SET @@session.replicate_allow_parallel=%u*/%s\n", + !!(flags2 & FL_ALLOW_PARALLEL), print_event_info->delimiter); + print_event_info->allow_parallel= !!(flags2 & FL_ALLOW_PARALLEL); + print_event_info->allow_parallel_printed= true; } - else - my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf); if (!print_event_info->domain_id_printed || print_event_info->domain_id != domain_id) @@ -9600,6 +9636,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) by mysql_reset_thd_for_next_command. */ thd->transaction.stmt.modified_non_trans_table= FALSE; + thd->transaction.stmt.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT; /* This is a row injection, so we flag the "statement" as such. Note that this code is called both when the slave does row @@ -10058,7 +10095,10 @@ static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD * thd) rows_log_event::do_apply_event() */ if (!thd->in_multi_stmt_transaction_mode()) + { thd->transaction.all.modified_non_trans_table= 0; + thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT; + } rgi->cleanup_context(thd, 0); } @@ -12625,7 +12665,7 @@ st_print_event_info::st_print_event_info() charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER), thread_id(0), thread_id_printed(false), server_id(0), server_id_printed(false), domain_id(0), domain_id_printed(false), - skip_replication(0), + allow_parallel(true), allow_parallel_printed(false), skip_replication(0), base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE) { /* @@ -12675,7 +12715,7 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos, return FALSE; #else const Relay_log_info *rli= &(active_mi->rli); - if (opt_slave_parallel_threads == 0) + if (!rli->mi->using_parallel()) { *log_file_name= rli->group_master_log_name; *log_pos= rli->group_master_log_pos + |