summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-12-06 13:28:23 +0100
committerunknown <knielsen@knielsen-hq.org>2013-12-06 13:28:23 +0100
commitb7ae65ef86636e986594d8e44dec83a83311977c (patch)
treeb78b7e163c294afb80bac87dc8a7369bb37c950e /sql
parent4d6ee2d1197c0e5ebecd019d6625d7d05bf1cab8 (diff)
downloadmariadb-git-b7ae65ef86636e986594d8e44dec83a83311977c.tar.gz
MDEV-5363: Make parallel replication waits killable
A couple of more parallel replication waits made killable.
Diffstat (limited to 'sql')
-rw-r--r--sql/rpl_parallel.cc48
1 files changed, 44 insertions, 4 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 0de164483d4..ff2ad84e037 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -285,21 +285,42 @@ handle_rpl_parallel_thread(void *arg)
wait_start_sub_id= rgi->wait_start_sub_id;
if (wait_for_sub_id || wait_start_sub_id)
{
+ bool did_enter_cond= false;
+ const char *old_msg= NULL;
+
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (wait_start_sub_id)
{
- while (wait_start_sub_id > entry->last_committed_sub_id)
+ old_msg= thd->enter_cond(&entry->COND_parallel_entry,
+ &entry->LOCK_parallel_entry,
+ "Waiting for prior transaction to commit "
+ "before starting next transaction");
+ did_enter_cond= true;
+ while (wait_start_sub_id > entry->last_committed_sub_id &&
+ !thd->check_killed())
mysql_cond_wait(&entry->COND_parallel_entry,
&entry->LOCK_parallel_entry);
+ if (wait_start_sub_id > entry->last_committed_sub_id)
+ {
+ /* The thread got a kill signal. */
+ thd->send_kill_message();
+ rgi->is_error= true;
+ slave_output_error_info(rgi->rli, thd);
+ rgi->cleanup_context(thd, true);
+ rgi->rli->abort_slave= true;
+ }
+ rgi->wait_start_sub_id= 0; /* No need to check again. */
}
- rgi->wait_start_sub_id= 0; /* No need to check again. */
if (wait_for_sub_id > entry->last_committed_sub_id)
{
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
}
if(thd->wait_for_commit_ptr)
@@ -753,6 +774,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;
bool is_group_event;
+ bool did_enter_cond= false;
+ const char *old_msg= NULL;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
@@ -860,6 +883,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
}
else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
break; // The thread is ready to queue into
+ else if (rli->sql_driver_thd->check_killed())
+ {
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ my_error(ER_CONNECTION_KILLED, MYF(0));
+ delete rgi;
+ return true;
+ }
else
{
/*
@@ -867,6 +897,13 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
use for queuing events, so wait for the thread to consume some
of its queue.
*/
+ if (!did_enter_cond)
+ {
+ old_msg= rli->sql_driver_thd->enter_cond
+ (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
+ "Waiting for room in worker thread event queue");
+ did_enter_cond= true;
+ }
mysql_cond_wait(&cur_thread->COND_rpl_thread,
&cur_thread->LOCK_rpl_thread);
}
@@ -1016,7 +1053,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev);
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ if (did_enter_cond)
+ rli->sql_driver_thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
mysql_cond_signal(&cur_thread->COND_rpl_thread);
return false;