summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc41
1 files changed, 36 insertions, 5 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 3eb25219ed3..35901cb5263 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -5,6 +5,10 @@
#include "sql_parse.h"
#include "debug_sync.h"
#include "sql_repl.h"
+#include "wsrep_mysqld.h"
+#ifdef WITH_WSREP
+#include "wsrep_trans_observer.h"
+#endif
/*
Code for optional parallel execution of replicated events on the slave.
@@ -36,6 +40,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
ev= qev->ev;
+#ifdef WITH_WSREP
+ if (wsrep_before_statement(thd))
+ {
+ WSREP_WARN("Parallel slave failed at wsrep_before_statement() hook");
+ return(1);
+ }
+#endif /* WITH_WSREP */
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
ev->thd= thd;
@@ -50,7 +61,14 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
err= apply_event_and_update_pos_for_parallel(ev, thd, rgi);
- thread_safe_increment64(&rli->executed_entries);
+ rli->executed_entries++;
+#ifdef WITH_WSREP
+ if (wsrep_after_statement(thd))
+ {
+ WSREP_WARN("Parallel slave failed at wsrep_after_statement() hook");
+ err= 1;
+ }
+#endif /* WITH_WSREP */
/* ToDo: error handling. */
return err;
}
@@ -1050,13 +1068,13 @@ handle_rpl_parallel_thread(void *arg)
my_thread_init();
thd = new THD(next_thread_id());
thd->thread_stack = (char*)&thd;
- add_to_active_threads(thd);
+ server_threads.insert(thd);
set_current_thd(thd);
pthread_detach_this_thread();
+ thd->store_globals();
thd->init_for_queries();
thd->variables.binlog_annotate_row_events= 0;
init_thr_lock();
- thd->store_globals();
thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
thd->security_ctx->skip_grants();
thd->variables.max_allowed_packet= slave_max_allowed_packet;
@@ -1087,6 +1105,14 @@ handle_rpl_parallel_thread(void *arg)
mysql_cond_signal(&rpt->COND_rpl_thread);
thd->set_command(COM_SLAVE_WORKER);
+#ifdef WITH_WSREP
+ wsrep_open(thd);
+ if (wsrep_before_command(thd))
+ {
+ WSREP_WARN("Parallel slave failed at wsrep_before_command() hook");
+ rpt->stop = true;
+ }
+#endif /* WITH_WSREP */
while (!rpt->stop)
{
uint wait_count= 0;
@@ -1457,6 +1483,11 @@ handle_rpl_parallel_thread(void *arg)
rpt->pool->release_thread(rpt);
}
}
+#ifdef WITH_WSREP
+ wsrep_after_command_before_result(thd);
+ wsrep_after_command_after_result(thd);
+ wsrep_close(thd);
+#endif /* WITH_WSREP */
rpt->thd= NULL;
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -1469,7 +1500,7 @@ handle_rpl_parallel_thread(void *arg)
thd->temporary_tables= 0;
THD_CHECK_SENTRY(thd);
- unlink_not_visible_thd(thd);
+ server_threads.erase(thd);
delete thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
@@ -1765,7 +1796,7 @@ rpl_parallel_thread::inuse_relaylog_refcount_update()
inuse_relaylog *ir= accumulated_ir_last;
if (ir)
{
- my_atomic_add64(&ir->dequeued_count, accumulated_ir_count);
+ ir->dequeued_count+= accumulated_ir_count;
accumulated_ir_count= 0;
accumulated_ir_last= NULL;
}