diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-10-17 14:11:19 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-10-17 14:11:19 +0200 |
commit | 7681c6aa787f9d3402059957bd8d993997cb623b (patch) | |
tree | 92eda5e12132c16d13e16abc7954f21c79740b11 | |
parent | fcaf1e6a82e2a9f6914b72ea9307c7d91d194150 (diff) | |
download | mariadb-git-7681c6aa787f9d3402059957bd8d993997cb623b.tar.gz |
MDEV-4506: Parallel replication: Intermediate commit.
Fix some part of update of old-style coordinates in parallel replication:
- Ignore XtraDB request for old-style coordinates, not meaningful for
parallel replication (must use GTID to get crash-safe parallel slave).
- Only update relay log coordinates forward, not backwards, to ensure
that parallel threads do not conflict with each other.
- Move future_event_relay_log_pos to rgi.
-rw-r--r-- | sql/log_event.cc | 56 | ||||
-rw-r--r-- | sql/log_event_old.cc | 2 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 2 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 1 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 32 | ||||
-rw-r--r-- | sql/rpl_rli.h | 15 | ||||
-rw-r--r-- | sql/slave.cc | 3 |
7 files changed, 69 insertions, 42 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 55166b65df4..cd6da8baa22 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3881,7 +3881,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, future-change-proof addon, e.g if COMMIT handling will start checking invariants like IN_STMT flag must be off at committing the transaction. */ - const_cast<Relay_log_info*>(rli)->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT); } else @@ -4249,7 +4249,6 @@ end: int Query_log_event::do_update_pos(rpl_group_info *rgi) { - Relay_log_info *rli= rgi->rli; /* Note that we will not increment group* positions if we are just after a SET ONE_SHOT, because SET ONE_SHOT should not be separated @@ -4257,7 +4256,7 @@ int Query_log_event::do_update_pos(rpl_group_info *rgi) */ if (thd->one_shot_set) { - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); return 0; } else @@ -4864,7 +4863,6 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi) int Format_description_log_event::do_update_pos(rpl_group_info *rgi) { - Relay_log_info *rli= rgi->rli; if (server_id == (uint32) global_system_variables.server_id) { /* @@ -4880,7 +4878,7 @@ int Format_description_log_event::do_update_pos(rpl_group_info *rgi) Intvar_log_event instead of starting at a Table_map_log_event or the Intvar_log_event respectively. */ - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); return 0; } else @@ -5955,7 +5953,7 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi) (ulong) rli->group_master_log_pos)); memcpy(rli->group_master_log_name, new_log_ident, ident_len+1); rli->notify_group_master_log_name_update(); - rli->inc_group_relay_log_pos(pos, TRUE /* skip_lock */); + rli->inc_group_relay_log_pos(pos, rgi, TRUE /* skip_lock */); DBUG_PRINT("info", ("new group_master_log_name: '%s' " "new group_master_log_pos: %lu", rli->group_master_log_name, @@ -5978,7 +5976,7 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi) thd->variables.auto_increment_offset= 1; } else - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); DBUG_RETURN(0); @@ -6290,8 +6288,7 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) int Gtid_log_event::do_update_pos(rpl_group_info *rgi) { - Relay_log_info *rli= rgi->rli; - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); return 0; } @@ -6723,8 +6720,7 @@ int Intvar_log_event::do_apply_event(rpl_group_info *rgi) int Intvar_log_event::do_update_pos(rpl_group_info *rgi) { - Relay_log_info *rli= rgi->rli; - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); return 0; } @@ -6820,8 +6816,7 @@ int Rand_log_event::do_apply_event(rpl_group_info *rgi) int Rand_log_event::do_update_pos(rpl_group_info *rgi) { - Relay_log_info *rli= rgi->rli; - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); return 0; } @@ -7485,8 +7480,7 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi) int User_var_log_event::do_update_pos(rpl_group_info *rgi) { - Relay_log_info *rli= rgi->rli; - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); return 0; } @@ -7717,11 +7711,11 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi) the target position when in fact we have not. */ if (rli->get_flag(Relay_log_info::IN_TRANSACTION)) - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); else { rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi); - rli->inc_group_relay_log_pos(0); + rli->inc_group_relay_log_pos(0, rgi); flush_relay_log_info(rli); } DBUG_RETURN(0); @@ -9543,7 +9537,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) } else { - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); } DBUG_RETURN(error); @@ -9767,8 +9761,7 @@ int Annotate_rows_log_event::do_apply_event(rpl_group_info *rgi) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int Annotate_rows_log_event::do_update_pos(rpl_group_info *rgi) { - Relay_log_info *rli= rgi->rli; - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); return 0; } #endif @@ -10395,8 +10388,7 @@ Table_map_log_event::do_shall_skip(rpl_group_info *rgi) int Table_map_log_event::do_update_pos(rpl_group_info *rgi) { - Relay_log_info *rli= rgi->rli; - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); return 0; } @@ -11930,11 +11922,21 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos, return FALSE; #else const Relay_log_info *rli= &(active_mi->rli); - *log_file_name= rli->group_master_log_name; - *log_pos= rli->group_master_log_pos + - (rli->future_event_relay_log_pos - rli->group_relay_log_pos); - *group_relay_log_name= rli->group_relay_log_name; - *relay_log_pos= rli->future_event_relay_log_pos; + if (opt_slave_parallel_threads == 0) + { + *log_file_name= rli->group_master_log_name; + *log_pos= rli->group_master_log_pos + + (rli->future_event_relay_log_pos - rli->group_relay_log_pos); + *group_relay_log_name= rli->group_relay_log_name; + *relay_log_pos= rli->future_event_relay_log_pos; + } + else + { + *log_file_name= ""; + *log_pos= 0; + *group_relay_log_name= ""; + *relay_log_pos= 0; + } return TRUE; #endif } diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 174219a8e72..cc212d6051b 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1839,7 +1839,7 @@ Old_rows_log_event::do_update_pos(rpl_group_info *rgi) } else { - rli->inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); } DBUG_RETURN(error); diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 19ae2a35339..fbf135c0bb6 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -64,6 +64,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, /* ToDo: Access to thd, and what about rli, split out a parallel part? */ mysql_mutex_lock(&rli->data_lock); qev->ev->thd= thd; + rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); thd->rgi_slave= NULL; @@ -659,6 +660,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) } qev->ev= ev; qev->next= NULL; + qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; if (typ == GTID_EVENT) { diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index b9106392faf..7830470a929 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -23,6 +23,7 @@ struct rpl_parallel_thread { queued_event *next; Log_event *ev; rpl_group_info *rgi; + ulonglong future_event_relay_log_pos; } *event_queue, *last_in_queue; }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 53481d2efaf..0ea6b1e5d13 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -869,17 +869,33 @@ improper_arguments: %d timed_out: %d", void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, - bool skip_lock) + rpl_group_info *rgi, + bool skip_lock) { DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos"); if (!skip_lock) mysql_mutex_lock(&data_lock); - inc_event_relay_log_pos(); - group_relay_log_pos= event_relay_log_pos; - strmake_buf(group_relay_log_name,event_relay_log_name); - - notify_group_relay_log_name_update(); + rgi->inc_event_relay_log_pos(); + if (opt_slave_parallel_threads > 0) + { + /* In case of parallel replication, do not update the position backwards. */ + int cmp= strcmp(group_relay_log_name, event_relay_log_name); + if (cmp < 0) + { + group_relay_log_pos= event_relay_log_pos; + strmake_buf(group_relay_log_name, event_relay_log_name); + notify_group_relay_log_name_update(); + } else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos) + group_relay_log_pos= event_relay_log_pos; + } + else + { + /* Non-parallel case. */ + group_relay_log_pos= event_relay_log_pos; + strmake_buf(group_relay_log_name, event_relay_log_name); + notify_group_relay_log_name_update(); + } /* If the slave does not support transactions and replicates a transaction, @@ -1226,10 +1242,10 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, */ if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) - inc_event_relay_log_pos(); + rgi->inc_event_relay_log_pos(); else { - inc_group_relay_log_pos(event_master_log_pos); + inc_group_relay_log_pos(event_master_log_pos, rgi); if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi)) { report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 68cd051be2a..92f65a1397d 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -351,13 +351,9 @@ public: if (until_condition==UNTIL_MASTER_POS) until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN; } - - inline void inc_event_relay_log_pos() - { - event_relay_log_pos= future_event_relay_log_pos; - } void inc_group_relay_log_pos(ulonglong log_pos, + rpl_group_info *rgi, bool skip_lock=0); int wait_for_pos(THD* thd, String* log_name, longlong log_pos, @@ -561,6 +557,8 @@ struct rpl_group_info */ time_t last_event_start_time; + ulonglong future_event_relay_log_pos; + private: /* Runtime state for printing a note when slave is taking @@ -684,6 +682,13 @@ public: { return long_find_row_note_printed; } + + inline void inc_event_relay_log_pos() + { + if (opt_slave_parallel_threads == 0 || + rli->event_relay_log_pos < future_event_relay_log_pos) + rli->event_relay_log_pos= future_event_relay_log_pos; + } }; diff --git a/sql/slave.cc b/sql/slave.cc index 61c63cd2862..50960991faf 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3331,7 +3331,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, rli->abort_slave= 1; mysql_mutex_unlock(&rli->data_lock); delete ev; - rli->inc_event_relay_log_pos(); + serial_rgi->inc_event_relay_log_pos(); DBUG_RETURN(0); };); } @@ -3360,6 +3360,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, DBUG_RETURN(1); } + serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); delete_or_keep_event_post_apply(serial_rgi, typ, ev); |