summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2014-12-05 16:09:48 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-12-06 08:49:50 +0100
commitdb21fddc3740dfa48f3443751c48282467afac5e (patch)
tree3ed31a4a5ce9abf3fe98cdd9c9bdf68ba8b5832b /sql/log_event.cc
parent1e3f09f1638e2bdec6029f6c98317d17d7ca76d1 (diff)
downloadmariadb-git-db21fddc3740dfa48f3443751c48282467afac5e.tar.gz
MDEV-6676: Optimistic parallel replication
Implement a new mode for parallel replication. In this mode, all transactions are optimistically attempted applied in parallel. In case of conflicts, the offending transaction is rolled back and retried later non-parallel. This is an early-release patch to facilitate testing, more changes to user interface / options will be expected. The new mode is not enabled by default.
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc56
1 files changed, 48 insertions, 8 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 66b577ff0bb..5c42da4b743 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -187,6 +187,8 @@ is_parallel_retry_error(rpl_group_info *rgi, int err)
{
if (!rgi->is_parallel_exec)
return false;
+ if (rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC)
+ return true;
if (rgi->killed_for_retry &&
(err == ER_QUERY_INTERRUPTED || err == ER_CONNECTION_KILLED))
return true;
@@ -6382,6 +6384,18 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0))
{
cache_type= Log_event::EVENT_NO_CACHE;
+ if (thd_arg->transaction.stmt.trans_did_wait() ||
+ thd_arg->transaction.all.trans_did_wait())
+ flags2|= FL_WAITED;
+ if (sql_command_flags[thd->lex->sql_command] & CF_DISALLOW_IN_RO_TRANS)
+ flags2|= FL_DDL;
+ else if (is_transactional)
+ flags2|= FL_TRANSACTIONAL;
+ if (thd_arg->variables.option_bits & OPTION_RPL_ALLOW_PARALLEL)
+ flags2|= FL_ALLOW_PARALLEL;
+ /* Preserve any DDL or WAITED flag in the slave's binlog. */
+ if (thd_arg->rgi_slave)
+ flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED));
}
@@ -6501,9 +6515,11 @@ static char gtid_begin_string[] = "BEGIN";
int
Gtid_log_event::do_apply_event(rpl_group_info *rgi)
{
+ ulonglong bits= thd->variables.option_bits;
thd->variables.server_id= this->server_id;
thd->variables.gtid_domain_id= this->domain_id;
thd->variables.gtid_seq_no= this->seq_no;
+ rgi->gtid_ev_flags2= flags2;
mysql_reset_thd_for_next_command(thd);
if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
@@ -6513,12 +6529,17 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
return 1;
}
- DBUG_ASSERT((thd->variables.option_bits & OPTION_GTID_BEGIN) == 0);
+ DBUG_ASSERT((bits & OPTION_GTID_BEGIN) == 0);
if (flags2 & FL_STANDALONE)
return 0;
/* Execute this like a BEGIN query event. */
- thd->variables.option_bits|= OPTION_GTID_BEGIN;
+ bits|= OPTION_GTID_BEGIN;
+ if (flags2 & FL_ALLOW_PARALLEL)
+ bits|= (ulonglong)OPTION_RPL_ALLOW_PARALLEL;
+ else
+ bits&= ~(ulonglong)OPTION_RPL_ALLOW_PARALLEL;
+ thd->variables.option_bits= bits;
DBUG_PRINT("info", ("Set OPTION_GTID_BEGIN"));
thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
&my_charset_bin, next_query_id());
@@ -6590,14 +6611,29 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
{
print_header(&cache, print_event_info, FALSE);
longlong10_to_str(seq_no, buf, 10);
+ my_b_printf(&cache, "\tGTID %u-%u-%s", domain_id, server_id, buf);
if (flags2 & FL_GROUP_COMMIT_ID)
{
longlong10_to_str(commit_id, buf2, 10);
- my_b_printf(&cache, "\tGTID %u-%u-%s cid=%s\n",
- domain_id, server_id, buf, buf2);
+ my_b_printf(&cache, " cid=%s", buf2);
+ }
+ if (flags2 & FL_DDL)
+ my_b_write_string(&cache, " ddl");
+ if (flags2 & FL_TRANSACTIONAL)
+ my_b_write_string(&cache, " trans");
+ if (flags2 & FL_WAITED)
+ my_b_write_string(&cache, " waited");
+ my_b_printf(&cache, "\n");
+
+ if (!print_event_info->allow_parallel_printed ||
+ print_event_info->allow_parallel != !!(flags2 & FL_ALLOW_PARALLEL))
+ {
+ my_b_printf(&cache,
+ "/*!100101 SET @@session.replicate_allow_parallel=%u*/%s\n",
+ !!(flags2 & FL_ALLOW_PARALLEL), print_event_info->delimiter);
+ print_event_info->allow_parallel= !!(flags2 & FL_ALLOW_PARALLEL);
+ print_event_info->allow_parallel_printed= true;
}
- else
- my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf);
if (!print_event_info->domain_id_printed ||
print_event_info->domain_id != domain_id)
@@ -9600,6 +9636,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
by mysql_reset_thd_for_next_command.
*/
thd->transaction.stmt.modified_non_trans_table= FALSE;
+ thd->transaction.stmt.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
/*
This is a row injection, so we flag the "statement" as
such. Note that this code is called both when the slave does row
@@ -10058,7 +10095,10 @@ static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD * thd)
rows_log_event::do_apply_event()
*/
if (!thd->in_multi_stmt_transaction_mode())
+ {
thd->transaction.all.modified_non_trans_table= 0;
+ thd->transaction.all.m_unsafe_rollback_flags&= ~THD_TRANS::DID_WAIT;
+ }
rgi->cleanup_context(thd, 0);
}
@@ -12625,7 +12665,7 @@ st_print_event_info::st_print_event_info()
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
thread_id(0), thread_id_printed(false), server_id(0),
server_id_printed(false), domain_id(0), domain_id_printed(false),
- skip_replication(0),
+ allow_parallel(true), allow_parallel_printed(false), skip_replication(0),
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
{
/*
@@ -12675,7 +12715,7 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
return FALSE;
#else
const Relay_log_info *rli= &(active_mi->rli);
- if (opt_slave_parallel_threads == 0)
+ if (!rli->mi->using_parallel())
{
*log_file_name= rli->group_master_log_name;
*log_pos= rli->group_master_log_pos +