summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc96
1 files changed, 84 insertions, 12 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 80125e8aa29..cc65856e37b 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -2,6 +2,7 @@
#include "rpl_parallel.h"
#include "slave.h"
#include "rpl_mi.h"
+#include "debug_sync.h"
/*
@@ -24,7 +25,7 @@ static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
- int err __attribute__((unused));
+ int err;
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
@@ -142,7 +143,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
if (err)
wfc->unregister_wait_for_prior_commit();
else
- wfc->wait_for_prior_commit();
+ err= wfc->wait_for_prior_commit(thd);
thd->wait_for_commit_ptr= NULL;
/*
@@ -172,6 +173,18 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
}
+static void
+signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
+{
+ rgi->is_error= true;
+ rgi->cleanup_context(thd, true);
+ rgi->rli->abort_slave= true;
+ mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
+ mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
+ rgi->rli->relay_log.signal_update();
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -207,6 +220,7 @@ handle_rpl_parallel_thread(void *arg)
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
set_slave_thread_options(thd);
thd->client_capabilities = CLIENT_LOCAL_FILES;
+ thd->net.reading_or_writing= 0;
thd_proc_info(thd, "Waiting for work from main SQL threads");
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
@@ -284,21 +298,42 @@ handle_rpl_parallel_thread(void *arg)
wait_start_sub_id= rgi->wait_start_sub_id;
if (wait_for_sub_id || wait_start_sub_id)
{
+ bool did_enter_cond= false;
+ PSI_stage_info old_stage;
+
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (wait_start_sub_id)
{
- while (wait_start_sub_id > entry->last_committed_sub_id)
+ thd->ENTER_COND(&entry->COND_parallel_entry,
+ &entry->LOCK_parallel_entry,
+ &stage_waiting_for_prior_transaction_to_commit,
+ &old_stage);
+ did_enter_cond= true;
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
+ while (wait_start_sub_id > entry->last_committed_sub_id &&
+ !thd->check_killed())
mysql_cond_wait(&entry->COND_parallel_entry,
&entry->LOCK_parallel_entry);
+ if (wait_start_sub_id > entry->last_committed_sub_id)
+ {
+ /* The thread got a kill signal. */
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
+ thd->send_kill_message();
+ slave_output_error_info(rgi->rli, thd);
+ signal_error_to_sql_driver_thread(thd, rgi);
+ }
+ rgi->wait_start_sub_id= 0; /* No need to check again. */
}
- rgi->wait_start_sub_id= 0; /* No need to check again. */
if (wait_for_sub_id > entry->last_committed_sub_id)
{
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ if (did_enter_cond)
+ thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
}
if(thd->wait_for_commit_ptr)
@@ -341,10 +376,8 @@ handle_rpl_parallel_thread(void *arg)
if (err)
{
- rgi->is_error= true;
slave_output_error_info(rgi->rli, thd);
- rgi->cleanup_context(thd, true);
- rgi->rli->abort_slave= true;
+ signal_error_to_sql_driver_thread(thd, rgi);
}
if (end_of_group)
{
@@ -353,6 +386,7 @@ handle_rpl_parallel_thread(void *arg)
&rgi->commit_orderer);
delete rgi;
group_rgi= rgi= NULL;
+ DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
events= next;
@@ -383,11 +417,9 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- group_rgi->is_error= true;
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, &group_rgi->commit_orderer);
- group_rgi->cleanup_context(thd, true);
- group_rgi->rli->abort_slave= true;
+ signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
delete group_rgi;
group_rgi= NULL;
@@ -752,6 +784,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;
bool is_group_event;
+ bool did_enter_cond= false;
+ PSI_stage_info old_stage;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
@@ -766,6 +800,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
sql_thread_stopping= true;
if (sql_thread_stopping)
{
+ delete ev;
/* QQ: Need a better comment why we return false here */
return false;
}
@@ -774,6 +809,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
MYF(0))))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete ev;
return true;
}
qev->ev= ev;
@@ -796,6 +832,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
delete rgi;
+ my_free(qev);
+ delete ev;
return true;
}
rgi->is_parallel_exec = true;
@@ -813,6 +851,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
However, the commit of this event must wait for the commit of the prior
event, to preserve binlog commit order and visibility across all
servers in the replication hierarchy.
+
+ In addition, we must not start executing this event until we have
+ finished the previous collection of event groups that group-committed
+ together; we use rgi->wait_start_sub_id to control this.
*/
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
rgi->wait_commit_sub_id= e->current_sub_id;
@@ -859,6 +901,21 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
}
else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
break; // The thread is ready to queue into
+ else if (rli->sql_driver_thd->check_killed())
+ {
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ my_error(ER_CONNECTION_KILLED, MYF(0));
+ delete rgi;
+ my_free(qev);
+ delete ev;
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
+ };);
+ slave_output_error_info(rli, rli->sql_driver_thd);
+ return true;
+ }
else
{
/*
@@ -866,6 +923,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
use for queuing events, so wait for the thread to consume some
of its queue.
*/
+ if (!did_enter_cond)
+ {
+ rli->sql_driver_thd->ENTER_COND(&cur_thread->COND_rpl_thread,
+ &cur_thread->LOCK_rpl_thread,
+ &stage_waiting_for_room_in_worker_thread, &old_stage);
+ did_enter_cond= true;
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
+ };);
+ }
mysql_cond_wait(&cur_thread->COND_rpl_thread,
&cur_thread->LOCK_rpl_thread);
}
@@ -1015,7 +1084,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev);
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ if (did_enter_cond)
+ rli->sql_driver_thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
mysql_cond_signal(&cur_thread->COND_rpl_thread);
return false;