diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 96 |
1 files changed, 84 insertions, 12 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 80125e8aa29..cc65856e37b 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2,6 +2,7 @@ #include "rpl_parallel.h" #include "slave.h" #include "rpl_mi.h" +#include "debug_sync.h" /* @@ -24,7 +25,7 @@ static int rpt_handle_event(rpl_parallel_thread::queued_event *qev, struct rpl_parallel_thread *rpt) { - int err __attribute__((unused)); + int err; rpl_group_info *rgi= qev->rgi; Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; @@ -142,7 +143,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id, if (err) wfc->unregister_wait_for_prior_commit(); else - wfc->wait_for_prior_commit(); + err= wfc->wait_for_prior_commit(thd); thd->wait_for_commit_ptr= NULL; /* @@ -172,6 +173,18 @@ finish_event_group(THD *thd, int err, uint64 sub_id, } +static void +signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) +{ + rgi->is_error= true; + rgi->cleanup_context(thd, true); + rgi->rli->abort_slave= true; + mysql_mutex_lock(rgi->rli->relay_log.get_log_lock()); + mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock()); + rgi->rli->relay_log.signal_update(); +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -207,6 +220,7 @@ handle_rpl_parallel_thread(void *arg) thd->variables.log_slow_filter= global_system_variables.log_slow_filter; set_slave_thread_options(thd); thd->client_capabilities = CLIENT_LOCAL_FILES; + thd->net.reading_or_writing= 0; thd_proc_info(thd, "Waiting for work from main SQL threads"); thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; @@ -284,21 +298,42 @@ handle_rpl_parallel_thread(void *arg) wait_start_sub_id= rgi->wait_start_sub_id; if (wait_for_sub_id || wait_start_sub_id) { + bool did_enter_cond= false; + PSI_stage_info old_stage; + mysql_mutex_lock(&entry->LOCK_parallel_entry); if (wait_start_sub_id) { - while (wait_start_sub_id > entry->last_committed_sub_id) + thd->ENTER_COND(&entry->COND_parallel_entry, + &entry->LOCK_parallel_entry, + &stage_waiting_for_prior_transaction_to_commit, + &old_stage); + did_enter_cond= true; + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); + while (wait_start_sub_id > entry->last_committed_sub_id && + !thd->check_killed()) mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry); + if (wait_start_sub_id > entry->last_committed_sub_id) + { + /* The thread got a kill signal. */ + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); + thd->send_kill_message(); + slave_output_error_info(rgi->rli, thd); + signal_error_to_sql_driver_thread(thd, rgi); + } + rgi->wait_start_sub_id= 0; /* No need to check again. */ } - rgi->wait_start_sub_id= 0; /* No need to check again. */ if (wait_for_sub_id > entry->last_committed_sub_id) { wait_for_commit *waitee= &rgi->wait_commit_group_info->commit_orderer; rgi->commit_orderer.register_wait_for_prior_commit(waitee); } - mysql_mutex_unlock(&entry->LOCK_parallel_entry); + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&entry->LOCK_parallel_entry); } if(thd->wait_for_commit_ptr) @@ -341,10 +376,8 @@ handle_rpl_parallel_thread(void *arg) if (err) { - rgi->is_error= true; slave_output_error_info(rgi->rli, thd); - rgi->cleanup_context(thd, true); - rgi->rli->abort_slave= true; + signal_error_to_sql_driver_thread(thd, rgi); } if (end_of_group) { @@ -353,6 +386,7 @@ handle_rpl_parallel_thread(void *arg) &rgi->commit_orderer); delete rgi; group_rgi= rgi= NULL; + DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } events= next; @@ -383,11 +417,9 @@ handle_rpl_parallel_thread(void *arg) half-processed event group. */ mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - group_rgi->is_error= true; finish_event_group(thd, 1, group_rgi->gtid_sub_id, group_rgi->parallel_entry, &group_rgi->commit_orderer); - group_rgi->cleanup_context(thd, true); - group_rgi->rli->abort_slave= true; + signal_error_to_sql_driver_thread(thd, group_rgi); in_event_group= false; delete group_rgi; group_rgi= NULL; @@ -752,6 +784,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Relay_log_info *rli= serial_rgi->rli; enum Log_event_type typ; bool is_group_event; + bool did_enter_cond= false; + PSI_stage_info old_stage; /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); @@ -766,6 +800,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, sql_thread_stopping= true; if (sql_thread_stopping) { + delete ev; /* QQ: Need a better comment why we return false here */ return false; } @@ -774,6 +809,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, MYF(0)))) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); + delete ev; return true; } qev->ev= ev; @@ -796,6 +832,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); delete rgi; + my_free(qev); + delete ev; return true; } rgi->is_parallel_exec = true; @@ -813,6 +851,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, However, the commit of this event must wait for the commit of the prior event, to preserve binlog commit order and visibility across all servers in the replication hierarchy. + + In addition, we must not start executing this event until we have + finished the previous collection of event groups that group-committed + together; we use rgi->wait_start_sub_id to control this. */ rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); rgi->wait_commit_sub_id= e->current_sub_id; @@ -859,6 +901,21 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) break; // The thread is ready to queue into + else if (rli->sql_driver_thd->check_killed()) + { + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + my_error(ER_CONNECTION_KILLED, MYF(0)); + delete rgi; + my_free(qev); + delete ev; + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", + { + debug_sync_set_action(rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_killed")); + };); + slave_output_error_info(rli, rli->sql_driver_thd); + return true; + } else { /* @@ -866,6 +923,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, use for queuing events, so wait for the thread to consume some of its queue. */ + if (!did_enter_cond) + { + rli->sql_driver_thd->ENTER_COND(&cur_thread->COND_rpl_thread, + &cur_thread->LOCK_rpl_thread, + &stage_waiting_for_room_in_worker_thread, &old_stage); + did_enter_cond= true; + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", + { + debug_sync_set_action(rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_ready")); + };); + } mysql_cond_wait(&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread); } @@ -1015,7 +1084,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, */ rli->event_relay_log_pos= rli->future_event_relay_log_pos; cur_thread->enqueue(qev); - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + if (did_enter_cond) + rli->sql_driver_thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); mysql_cond_signal(&cur_thread->COND_rpl_thread); return false; |