diff options
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 3 | ||||
-rw-r--r-- | sql/log_event.cc | 16 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 105 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 1 |
4 files changed, 114 insertions, 11 deletions
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 89834b790d6..5709cab19c0 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -92,6 +92,7 @@ INSERT INTO t2 VALUES (foo(10, --connection server_2 FLUSH LOGS; +--source include/wait_for_binlog_checkpoint.inc SET sql_log_bin=0; --delimiter || CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) @@ -148,6 +149,7 @@ SELECT * FROM t2 WHERE a >= 10 ORDER BY a; --let $binlog_file= slave-bin.000002 --source include/show_binlog_events.inc FLUSH LOGS; +--source include/wait_for_binlog_checkpoint.inc # Restart all the slave parallel worker threads, to clear all debug_sync actions. --connection server_2 @@ -161,6 +163,7 @@ SET debug_sync='RESET'; --echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. *** --connection server_1 FLUSH LOGS; +--source include/wait_for_binlog_checkpoint.inc CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; # Create some sentinel rows so that the rows inserted in parallel fall into # separate gaps and do not cause gap lock conflicts. diff --git a/sql/log_event.cc b/sql/log_event.cc index e7c0506a50a..7ce6c203248 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -966,11 +966,17 @@ int Log_event::do_update_pos(rpl_group_info *rgi) if (debug_not_change_ts_if_art_event == 1 && is_artificial_event()) debug_not_change_ts_if_art_event= 0; ); - rli->stmt_done(log_pos, - (is_artificial_event() && - IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? - 0 : when), - thd, rgi); + /* + In parallel execution, delay position update for the events that are + not part of event groups (format description, rotate, and such) until + the actual event execution reaches that point. + */ + if (!rgi->is_parallel_exec || is_group_event(get_type_code())) + rli->stmt_done(log_pos, + (is_artificial_event() && + IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? + 0 : when), + thd, rgi); DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp", if (debug_not_change_ts_if_art_event == 0) debug_not_change_ts_if_art_event= 2; ); diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index d62bec6e605..8328dd24128 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -56,6 +56,48 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, } +static void +handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) +{ + int cmp; + Relay_log_info *rli; + /* + Events that are not part of an event group, such as Format Description, + Stop, GTID List and such, are executed directly in the driver SQL thread, + to keep the relay log state up-to-date. But the associated position update + is done here, in sync with other normal events as they are queued to + worker threads. + */ + if ((thd->variables.option_bits & OPTION_BEGIN) && + opt_using_transactions) + return; + rli= qev->rgi->rli; + mysql_mutex_lock(&rli->data_lock); + cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); + if (cmp < 0) + { + rli->group_relay_log_pos= qev->future_event_relay_log_pos; + strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name); + rli->notify_group_relay_log_name_update(); + } else if (cmp == 0 && + rli->group_relay_log_pos < qev->future_event_relay_log_pos) + rli->group_relay_log_pos= qev->future_event_relay_log_pos; + + cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name); + if (cmp < 0) + { + strcpy(rli->group_master_log_name, qev->future_event_master_log_name); + rli->notify_group_master_log_name_update(); + rli->group_master_log_pos= qev->future_event_master_log_pos; + } + else if (cmp == 0 + && rli->group_master_log_pos < qev->future_event_master_log_pos) + rli->group_master_log_pos= qev->future_event_master_log_pos; + mysql_mutex_unlock(&rli->data_lock); + mysql_cond_broadcast(&rli->data_cond); +} + + static bool sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) { @@ -142,16 +184,24 @@ handle_rpl_parallel_thread(void *arg) while (events) { struct rpl_parallel_thread::queued_event *next= events->next; - Log_event_type event_type= events->ev->get_type_code(); + Log_event_type event_type; rpl_group_info *rgi= events->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; uint64 wait_for_sub_id; uint64 wait_start_sub_id; bool end_of_group; + if (!events->ev) + { + handle_queued_pos_update(thd, events); + my_free(events); + events= next; + continue; + } + err= 0; /* Handle a new event group, which will be initiated by a GTID event. */ - if (event_type == GTID_EVENT) + if ((event_type= events->ev->get_type_code()) == GTID_EVENT) { in_event_group= true; /* @@ -794,13 +844,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, e->last_commit_id= 0; } - e->current_group_info= rgi; + qev->rgi= e->current_group_info= rgi; e->current_sub_id= rgi->gtid_sub_id; current= rgi->parallel_entry= e; } else if (!is_group_event || !current) { + my_off_t log_pos; int err; + bool tmp; /* Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Same for events not preceeded by GTID (we should not see those normally, @@ -824,11 +876,52 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } } + tmp= serial_rgi->is_parallel_exec; + serial_rgi->is_parallel_exec= true; err= rpt_handle_event(qev, NULL); + serial_rgi->is_parallel_exec= tmp; + log_pos= qev->ev->log_pos; delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); - my_free(qev); - return (err != 0); + if (err) + { + my_free(qev); + return true; + } + qev->ev= NULL; + qev->future_event_master_log_pos= log_pos; + if (!current) + { + handle_queued_pos_update(rli->sql_driver_thd, qev); + my_free(qev); + return false; + } + /* + Queue an empty event, so that the position will be updated in a + reasonable way relative to other events: + + - If the currently executing events are queued serially for a single + thread, the position will only be updated when everything before has + completed. + + - If we are executing multiple independent events in parallel, then at + least the position will not be updated until one of them has reached + the current point. + */ + cur_thread= current->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + if (cur_thread->current_entry != current) + { + /* Not ours anymore, we need to grab a new one. */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + cur_thread= NULL; + } + } + if (!cur_thread) + cur_thread= current->rpl_thread= + global_rpl_thread_pool.get_thread(current); } else { @@ -848,8 +941,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, cur_thread= current->rpl_thread= global_rpl_thread_pool.get_thread(current); } + qev->rgi= current->current_group_info; } - qev->rgi= current->current_group_info; /* Queue the event for processing. diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index fe9c6708e97..0b9619e5e83 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -27,6 +27,7 @@ struct rpl_parallel_thread { char event_relay_log_name[FN_REFLEN]; char future_event_master_log_name[FN_REFLEN]; ulonglong event_relay_log_pos; + my_off_t future_event_master_log_pos; size_t event_size; } *event_queue, *last_in_queue; uint64 queued_size; |