diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-12-06 13:28:23 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-12-06 13:28:23 +0100 |
commit | b7ae65ef86636e986594d8e44dec83a83311977c (patch) | |
tree | b78b7e163c294afb80bac87dc8a7369bb37c950e /sql | |
parent | 4d6ee2d1197c0e5ebecd019d6625d7d05bf1cab8 (diff) | |
download | mariadb-git-b7ae65ef86636e986594d8e44dec83a83311977c.tar.gz |
MDEV-5363: Make parallel replication waits killable
A couple of more parallel replication waits made killable.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/rpl_parallel.cc | 48 |
1 files changed, 44 insertions, 4 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 0de164483d4..ff2ad84e037 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -285,21 +285,42 @@ handle_rpl_parallel_thread(void *arg) wait_start_sub_id= rgi->wait_start_sub_id; if (wait_for_sub_id || wait_start_sub_id) { + bool did_enter_cond= false; + const char *old_msg= NULL; + mysql_mutex_lock(&entry->LOCK_parallel_entry); if (wait_start_sub_id) { - while (wait_start_sub_id > entry->last_committed_sub_id) + old_msg= thd->enter_cond(&entry->COND_parallel_entry, + &entry->LOCK_parallel_entry, + "Waiting for prior transaction to commit " + "before starting next transaction"); + did_enter_cond= true; + while (wait_start_sub_id > entry->last_committed_sub_id && + !thd->check_killed()) mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry); + if (wait_start_sub_id > entry->last_committed_sub_id) + { + /* The thread got a kill signal. */ + thd->send_kill_message(); + rgi->is_error= true; + slave_output_error_info(rgi->rli, thd); + rgi->cleanup_context(thd, true); + rgi->rli->abort_slave= true; + } + rgi->wait_start_sub_id= 0; /* No need to check again. */ } - rgi->wait_start_sub_id= 0; /* No need to check again. */ if (wait_for_sub_id > entry->last_committed_sub_id) { wait_for_commit *waitee= &rgi->wait_commit_group_info->commit_orderer; rgi->commit_orderer.register_wait_for_prior_commit(waitee); } - mysql_mutex_unlock(&entry->LOCK_parallel_entry); + if (did_enter_cond) + thd->exit_cond(old_msg); + else + mysql_mutex_unlock(&entry->LOCK_parallel_entry); } if(thd->wait_for_commit_ptr) @@ -753,6 +774,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Relay_log_info *rli= serial_rgi->rli; enum Log_event_type typ; bool is_group_event; + bool did_enter_cond= false; + const char *old_msg= NULL; /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); @@ -860,6 +883,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) break; // The thread is ready to queue into + else if (rli->sql_driver_thd->check_killed()) + { + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + my_error(ER_CONNECTION_KILLED, MYF(0)); + delete rgi; + return true; + } else { /* @@ -867,6 +897,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, use for queuing events, so wait for the thread to consume some of its queue. */ + if (!did_enter_cond) + { + old_msg= rli->sql_driver_thd->enter_cond + (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread, + "Waiting for room in worker thread event queue"); + did_enter_cond= true; + } mysql_cond_wait(&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread); } @@ -1016,7 +1053,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, */ rli->event_relay_log_pos= rli->future_event_relay_log_pos; cur_thread->enqueue(qev); - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + if (did_enter_cond) + rli->sql_driver_thd->exit_cond(old_msg); + else + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); mysql_cond_signal(&cur_thread->COND_rpl_thread); return false; |