diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 41 |
1 files changed, 36 insertions, 5 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 3eb25219ed3..35901cb5263 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -5,6 +5,10 @@ #include "sql_parse.h" #include "debug_sync.h" #include "sql_repl.h" +#include "wsrep_mysqld.h" +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" +#endif /* Code for optional parallel execution of replicated events on the slave. @@ -36,6 +40,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT); ev= qev->ev; +#ifdef WITH_WSREP + if (wsrep_before_statement(thd)) + { + WSREP_WARN("Parallel slave failed at wsrep_before_statement() hook"); + return(1); + } +#endif /* WITH_WSREP */ thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; ev->thd= thd; @@ -50,7 +61,14 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time; err= apply_event_and_update_pos_for_parallel(ev, thd, rgi); - thread_safe_increment64(&rli->executed_entries); + rli->executed_entries++; +#ifdef WITH_WSREP + if (wsrep_after_statement(thd)) + { + WSREP_WARN("Parallel slave failed at wsrep_after_statement() hook"); + err= 1; + } +#endif /* WITH_WSREP */ /* ToDo: error handling. */ return err; } @@ -1050,13 +1068,13 @@ handle_rpl_parallel_thread(void *arg) my_thread_init(); thd = new THD(next_thread_id()); thd->thread_stack = (char*)&thd; - add_to_active_threads(thd); + server_threads.insert(thd); set_current_thd(thd); pthread_detach_this_thread(); + thd->store_globals(); thd->init_for_queries(); thd->variables.binlog_annotate_row_events= 0; init_thr_lock(); - thd->store_globals(); thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; thd->security_ctx->skip_grants(); thd->variables.max_allowed_packet= slave_max_allowed_packet; @@ -1087,6 +1105,14 @@ handle_rpl_parallel_thread(void *arg) mysql_cond_signal(&rpt->COND_rpl_thread); thd->set_command(COM_SLAVE_WORKER); +#ifdef WITH_WSREP + wsrep_open(thd); + if (wsrep_before_command(thd)) + { + WSREP_WARN("Parallel slave failed at wsrep_before_command() hook"); + rpt->stop = true; + } +#endif /* WITH_WSREP */ while (!rpt->stop) { uint wait_count= 0; @@ -1457,6 +1483,11 @@ handle_rpl_parallel_thread(void *arg) rpt->pool->release_thread(rpt); } } +#ifdef WITH_WSREP + wsrep_after_command_before_result(thd); + wsrep_after_command_after_result(thd); + wsrep_close(thd); +#endif /* WITH_WSREP */ rpt->thd= NULL; mysql_mutex_unlock(&rpt->LOCK_rpl_thread); @@ -1469,7 +1500,7 @@ handle_rpl_parallel_thread(void *arg) thd->temporary_tables= 0; THD_CHECK_SENTRY(thd); - unlink_not_visible_thd(thd); + server_threads.erase(thd); delete thd; mysql_mutex_lock(&rpt->LOCK_rpl_thread); @@ -1765,7 +1796,7 @@ rpl_parallel_thread::inuse_relaylog_refcount_update() inuse_relaylog *ir= accumulated_ir_last; if (ir) { - my_atomic_add64(&ir->dequeued_count, accumulated_ir_count); + ir->dequeued_count+= accumulated_ir_count; accumulated_ir_count= 0; accumulated_ir_last= NULL; } |