diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-10-23 15:03:03 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-10-23 15:03:03 +0200 |
commit | a09d2b105f8e56e8fec98975ea9fa091c263327a (patch) | |
tree | 0982c9f747563ca972b85313f62a175141f0a0f6 | |
parent | 7681c6aa787f9d3402059957bd8d993997cb623b (diff) | |
download | mariadb-git-a09d2b105f8e56e8fec98975ea9fa091c263327a.tar.gz |
MDEV-4506: Parallel replication.
Fix some more parts of old-style position updates.
Now we save in rgi some coordinates for master log and relay log, so
that in do_update_pos() we can use the right set of coordinates with
the right events.
The Rotate_log_event::do_update_pos() is fixed in the parallel case
to not directly update relay-log.info (as Rotate event runs directly
in the driver SQL thread, ahead of actual event execution). Instead,
group_master_log_file is updated as part of do_update_pos() in each
event execution.
In the parallel case, position updates happen in parallel without
any ordering, but taking care that position is not updated backwards.
Since position update happens only after event execution this leads
to the right result.
Also fix an access-after-free introduced in an earlier commit.
-rw-r--r-- | sql/log_event.cc | 32 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 16 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 3 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 25 | ||||
-rw-r--r-- | sql/rpl_rli.h | 26 | ||||
-rw-r--r-- | sql/slave.cc | 2 | ||||
-rw-r--r-- | sql/sys_vars.cc | 2 |
7 files changed, 61 insertions, 45 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index cd6da8baa22..e7c0506a50a 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3843,17 +3843,6 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, thd->variables.auto_increment_increment= auto_increment_increment; thd->variables.auto_increment_offset= auto_increment_offset; - /* - InnoDB internally stores the master log position it has executed so far, - i.e. the position just after the COMMIT event. - When InnoDB will want to store, the positions in rli won't have - been updated yet, so group_master_log_* will point to old BEGIN - and event_master_log* will point to the beginning of current COMMIT. - But log_pos of the COMMIT Query event is what we want, i.e. the pos of the - END of the current log event (COMMIT). We save it in rli so that InnoDB can - access it. - */ - const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos; DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); @@ -3882,7 +3871,6 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, invariants like IN_STMT flag must be off at committing the transaction. */ rgi->inc_event_relay_log_pos(); - const_cast<Relay_log_info*>(rli)->clear_flag(Relay_log_info::IN_STMT); } else { @@ -5535,16 +5523,6 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi, thd->lex->local_file= local_fname; mysql_reset_thd_for_next_command(thd, 0); - if (!use_rli_only_for_errors) - { - /* - Saved for InnoDB, see comment in - Query_log_event::do_apply_event() - */ - const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos; - DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); - } - /* We test replicate_*_db rules. Note that we have already prepared the file to load, even if we are going to ignore and delete it @@ -5940,11 +5918,16 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi) correspond to the beginning of the transaction. Starting from 5.0.0, there also are some rotates from the slave itself, in the relay log, which shall not change the group positions. + + In parallel replication, rotate event is executed out-of-band with normal + events, so we cannot update group_master_log_name or _pos here, it will + be updated with the next normal event instead. */ if ((server_id != global_system_variables.server_id || rli->replicate_same_server_id) && !is_relay_log_event() && - !rli->is_in_group()) + !rli->is_in_group() && + !rgi->is_parallel_exec) { mysql_mutex_lock(&rli->data_lock); DBUG_PRINT("info", ("old group_master_log_name: '%s' " @@ -7712,7 +7695,7 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi) */ if (rli->get_flag(Relay_log_info::IN_TRANSACTION)) rgi->inc_event_relay_log_pos(); - else + else if (!rgi->is_parallel_exec) { rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi); rli->inc_group_relay_log_pos(0, rgi); @@ -8408,7 +8391,6 @@ int Execute_load_log_event::do_apply_event(rpl_group_info *rgi) calls mysql_load()). */ - const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos; if (lev->do_apply_event(0,rgi,1)) { /* diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index fbf135c0bb6..8942b1d0a33 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -64,7 +64,11 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, /* ToDo: Access to thd, and what about rli, split out a parallel part? */ mysql_mutex_lock(&rli->data_lock); qev->ev->thd= thd; + strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); + rgi->event_relay_log_name= rgi->event_relay_log_name_buf; + rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; + strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); thd->rgi_slave= NULL; @@ -660,7 +664,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) } qev->ev= ev; qev->next= NULL; + strcpy(qev->event_relay_log_name, rli->event_relay_log_name); + qev->event_relay_log_pos= rli->event_relay_log_pos; qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; + strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); if (typ == GTID_EVENT) { @@ -674,6 +681,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) delete rgi; return true; } + rgi->is_parallel_exec = true; if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) rgi->deferred_events= new Deferred_log_events(rli); @@ -783,6 +791,14 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) have GTID, like a MariaDB 5.5 or MySQL master. */ qev->rgi= serial_rgi; + /* Handle master log name change, seen in Rotate_log_event. */ + if (typ == ROTATE_EVENT) + { + Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev); + memcpy(rli->future_event_master_log_name, + rev->new_log_ident, rev->ident_len+1); + } + rpt_handle_event(qev, NULL); delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); my_free(qev); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 7830470a929..7057ec66de2 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -24,6 +24,9 @@ struct rpl_parallel_thread { Log_event *ev; rpl_group_info *rgi; ulonglong future_event_relay_log_pos; + char event_relay_log_name[FN_REFLEN]; + char future_event_master_log_name[FN_REFLEN]; + ulonglong event_relay_log_pos; } *event_queue, *last_in_queue; }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 0ea6b1e5d13..f3a6863d217 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -877,7 +877,9 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, if (!skip_lock) mysql_mutex_lock(&data_lock); rgi->inc_event_relay_log_pos(); - if (opt_slave_parallel_threads > 0) + DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", + (long) log_pos, (long) group_master_log_pos)); + if (rgi->is_parallel_exec) { /* In case of parallel replication, do not update the position backwards. */ int cmp= strcmp(group_relay_log_name, event_relay_log_name); @@ -888,6 +890,18 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, notify_group_relay_log_name_update(); } else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos) group_relay_log_pos= event_relay_log_pos; + + cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name); + if (cmp <= 0) + { + if (cmp < 0) + { + strcpy(group_master_log_name, rgi->future_event_master_log_name); + notify_group_master_log_name_update(); + } + if (group_master_log_pos < log_pos) + group_master_log_pos= log_pos; + } } else { @@ -895,6 +909,8 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, group_relay_log_pos= event_relay_log_pos; strmake_buf(group_relay_log_name, event_relay_log_name); notify_group_relay_log_name_update(); + if (log_pos) // 3.23 binlogs don't have log_posx + group_master_log_pos= log_pos; } /* @@ -927,12 +943,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, the relay log is not "val". With the end_log_pos solution, we avoid computations involving lengthes. */ - DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", - (long) log_pos, (long) group_master_log_pos)); - if (log_pos) // 3.23 binlogs don't have log_posx - { - group_master_log_pos= log_pos; - } mysql_cond_broadcast(&data_cond); if (!skip_lock) mysql_mutex_unlock(&data_lock); @@ -1436,6 +1446,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_) wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), + is_parallel_exec(false), row_stmt_start_timestamp(0), long_find_row_note_printed(false) { bzero(¤t_gtid, sizeof(current_gtid)); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 92f65a1397d..38268ee85c5 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -185,6 +185,10 @@ public: char event_relay_log_name[FN_REFLEN]; ulonglong event_relay_log_pos; ulonglong future_event_relay_log_pos; + /* + The master log name for current event. Only used in parallel replication. + */ + char future_event_master_log_name[FN_REFLEN]; #ifdef HAVE_valgrind bool is_fake; /* Mark that this is a fake relay log info structure */ @@ -216,18 +220,6 @@ public: */ bool sql_force_rotate_relay; - /* - When it commits, InnoDB internally stores the master log position it has - processed so far; the position to store is the one of the end of the - committing event (the COMMIT query event, or the event if in autocommit - mode). - */ -#if MYSQL_VERSION_ID < 40100 - ulonglong future_master_log_pos; -#else - ulonglong future_group_master_log_pos; -#endif - time_t last_master_timestamp; void clear_until_condition(); @@ -557,7 +549,15 @@ struct rpl_group_info */ time_t last_event_start_time; + char *event_relay_log_name; + char event_relay_log_name_buf[FN_REFLEN]; + ulonglong event_relay_log_pos; ulonglong future_event_relay_log_pos; + /* + The master log name for current event. Only used in parallel replication. + */ + char future_event_master_log_name[FN_REFLEN]; + bool is_parallel_exec; private: /* @@ -685,7 +685,7 @@ public: inline void inc_event_relay_log_pos() { - if (opt_slave_parallel_threads == 0 || + if (!is_parallel_exec || rli->event_relay_log_pos < future_event_relay_log_pos) rli->event_relay_log_pos= future_event_relay_log_pos; } diff --git a/sql/slave.cc b/sql/slave.cc index 50960991faf..e73e3e14c10 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3361,6 +3361,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, } serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; + serial_rgi->event_relay_log_name= rli->event_relay_log_name; + serial_rgi->event_relay_log_pos= rli->event_relay_log_pos; exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); delete_or_keep_event_post_apply(serial_rgi, typ, ev); diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 91f13bebd12..d509a614b6e 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4228,6 +4228,8 @@ static bool check_pseudo_slave_mode(sys_var *self, THD *thd, set_var *var) #ifndef EMBEDDED_LIBRARY delete thd->rli_fake; thd->rli_fake= NULL; + delete thd->rgi_fake; + thd->rgi_fake= NULL; #endif } else if (previous_val && val) |