diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/rpl_parallel.cc | 26 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 17 | ||||
-rw-r--r-- | sql/sys_vars.cc | 11 |
3 files changed, 33 insertions, 21 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 63066d8d7c0..2bb5083a4f3 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -248,7 +248,7 @@ handle_rpl_parallel_thread(void *arg) if (!in_event_group) { rpt->current_entry= NULL; - if (!rpt->free) + if (!rpt->stop && !rpt->free) { mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool); list= rpt->pool->free_list; @@ -262,9 +262,27 @@ handle_rpl_parallel_thread(void *arg) } } + rpt->thd= NULL; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + + thd->clear_error(); + thd->catalog= 0; + thd->reset_query(); + thd->reset_db(NULL, 0); + thd_proc_info(thd, "Slave worker thread exiting"); + thd->temporary_tables= 0; + mysql_mutex_lock(&LOCK_thread_count); + THD_CHECK_SENTRY(thd); + delete thd; + mysql_mutex_unlock(&LOCK_thread_count); + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->running= false; + mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + my_thread_end(); + return NULL; } @@ -344,6 +362,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, { rpl_parallel_thread *rpt= pool->get_thread(NULL); rpt->stop= true; + mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); } @@ -354,7 +373,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, while (rpt->running) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - delete rpt; + mysql_mutex_destroy(&rpt->LOCK_rpl_thread); + mysql_cond_destroy(&rpt->COND_rpl_thread); + my_free(rpt); } my_free(pool->threads); @@ -386,6 +407,7 @@ err: mysql_mutex_lock(&new_free_list->LOCK_rpl_thread); new_free_list->delay_start= false; new_free_list->stop= true; + mysql_cond_signal(&new_free_list->COND_rpl_thread); while (!new_free_list->running) mysql_cond_wait(&new_free_list->COND_rpl_thread, &new_free_list->LOCK_rpl_thread); diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index bef9a4c3475..df6aab88200 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -44,7 +44,6 @@ void mysql_client_binlog_statement(THD* thd) { - struct rpl_group_info *rgi; DBUG_ENTER("mysql_client_binlog_statement"); DBUG_PRINT("info",("binlog base64: '%*s'", (int) (thd->lex->comment.length < 2048 ? @@ -100,6 +99,7 @@ void mysql_client_binlog_statement(THD* thd) const char *error= 0; char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; + struct rpl_group_info rgi(rli); /* Out of memory check @@ -197,17 +197,8 @@ void mysql_client_binlog_statement(THD* thd) } } - if (!(rgi= rli->group_info)) - { - if (!(rgi= rli->group_info= (struct rpl_group_info *) - my_malloc(sizeof(*rgi), MYF(0)))) - { - my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*rgi)); - goto end; - } - bzero(rgi, sizeof(*rgi)); - } - rgi->rli= rli; + rgi.rli= rli; + rgi.thd= thd; ev= Log_event::read_log_event(bufptr, event_len, &error, rli->relay_log.description_event_for_exec, 0); @@ -244,7 +235,7 @@ void mysql_client_binlog_statement(THD* thd) (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); - err= ev->apply_event(rgi); + err= ev->apply_event(&rgi); thd->variables.option_bits= (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 1273bff1750..91f13bebd12 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1442,11 +1442,9 @@ check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var) { bool running; - mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_active_mi); running= master_info_index->give_error_if_slave_running(); mysql_mutex_unlock(&LOCK_active_mi); - mysql_mutex_lock(&LOCK_global_system_variables); if (running) return true; @@ -1457,17 +1455,18 @@ static bool fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type) { bool running; + bool err= false; mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_active_mi); running= master_info_index->give_error_if_slave_running(); mysql_mutex_unlock(&LOCK_active_mi); - mysql_mutex_lock(&LOCK_global_system_variables); if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool, opt_slave_parallel_threads)) - return true; + err= true; + mysql_mutex_lock(&LOCK_global_system_variables); - return false; + return err; } @@ -1497,7 +1496,7 @@ static Sys_var_ulong Sys_binlog_commit_wait_count( static Sys_var_ulong Sys_binlog_commit_wait_usec( "binlog_commit_wait_usec", "Maximum time, in microseconds, to wait for more commits to queue up " - " for binlog group commit. Only takes effect if the value of " + "for binlog group commit. Only takes effect if the value of " "binlog_commit_wait_count is non-zero.", GLOBAL_VAR(opt_binlog_commit_wait_usec), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, ULONG_MAX), DEFAULT(100000), BLOCK_SIZE(1)); |