diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 24db5f5fb5e..4313840119e 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -4,6 +4,10 @@ #include "rpl_mi.h" #include "sql_parse.h" #include "debug_sync.h" +#include "wsrep_mysqld.h" +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" +#endif /* Code for optional parallel execution of replicated events on the slave. @@ -35,6 +39,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT); ev= qev->ev; +#ifdef WITH_WSREP + if (wsrep_before_statement(thd)) + { + WSREP_WARN("Parallel slave failed at wsrep_before_statement() hook"); + return(1); + } +#endif /* WITH_WSREP */ thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; ev->thd= thd; @@ -50,6 +61,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, err= apply_event_and_update_pos_for_parallel(ev, thd, rgi); thread_safe_increment64(&rli->executed_entries); +#ifdef WITH_WSREP + if (wsrep_after_statement(thd)) + { + WSREP_WARN("Parallel slave failed at wsrep_after_statement() hook"); + err= 1; + } +#endif /* WITH_WSREP */ /* ToDo: error handling. */ return err; } @@ -1066,6 +1084,14 @@ handle_rpl_parallel_thread(void *arg) mysql_cond_signal(&rpt->COND_rpl_thread); thd->set_command(COM_SLAVE_WORKER); +#ifdef WITH_WSREP + wsrep_open(thd); + if (wsrep_before_command(thd)) + { + WSREP_WARN("Parallel slave failed at wsrep_before_command() hook"); + rpt->stop = true; + } +#endif /* WITH_WSREP */ while (!rpt->stop) { uint wait_count= 0; @@ -1436,6 +1462,11 @@ handle_rpl_parallel_thread(void *arg) rpt->pool->release_thread(rpt); } } +#ifdef WITH_WSREP + wsrep_after_command_before_result(thd); + wsrep_after_command_after_result(thd); + wsrep_close(thd); +#endif /* WITH_WSREP */ rpt->thd= NULL; mysql_mutex_unlock(&rpt->LOCK_rpl_thread); |