summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/rpl_parallel.cc26
-rw-r--r--sql/sql_binlog.cc17
-rw-r--r--sql/sys_vars.cc11
3 files changed, 33 insertions, 21 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 63066d8d7c0..2bb5083a4f3 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -248,7 +248,7 @@ handle_rpl_parallel_thread(void *arg)
if (!in_event_group)
{
rpt->current_entry= NULL;
- if (!rpt->free)
+ if (!rpt->stop && !rpt->free)
{
mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
list= rpt->pool->free_list;
@@ -262,9 +262,27 @@ handle_rpl_parallel_thread(void *arg)
}
}
+ rpt->thd= NULL;
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+
+ thd->clear_error();
+ thd->catalog= 0;
+ thd->reset_query();
+ thd->reset_db(NULL, 0);
+ thd_proc_info(thd, "Slave worker thread exiting");
+ thd->temporary_tables= 0;
+ mysql_mutex_lock(&LOCK_thread_count);
+ THD_CHECK_SENTRY(thd);
+ delete thd;
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->running= false;
+ mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ my_thread_end();
+
return NULL;
}
@@ -344,6 +362,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
{
rpl_parallel_thread *rpt= pool->get_thread(NULL);
rpt->stop= true;
+ mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
}
@@ -354,7 +373,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
while (rpt->running)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- delete rpt;
+ mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
+ mysql_cond_destroy(&rpt->COND_rpl_thread);
+ my_free(rpt);
}
my_free(pool->threads);
@@ -386,6 +407,7 @@ err:
mysql_mutex_lock(&new_free_list->LOCK_rpl_thread);
new_free_list->delay_start= false;
new_free_list->stop= true;
+ mysql_cond_signal(&new_free_list->COND_rpl_thread);
while (!new_free_list->running)
mysql_cond_wait(&new_free_list->COND_rpl_thread,
&new_free_list->LOCK_rpl_thread);
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index bef9a4c3475..df6aab88200 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -44,7 +44,6 @@
void mysql_client_binlog_statement(THD* thd)
{
- struct rpl_group_info *rgi;
DBUG_ENTER("mysql_client_binlog_statement");
DBUG_PRINT("info",("binlog base64: '%*s'",
(int) (thd->lex->comment.length < 2048 ?
@@ -100,6 +99,7 @@ void mysql_client_binlog_statement(THD* thd)
const char *error= 0;
char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME));
Log_event *ev = 0;
+ struct rpl_group_info rgi(rli);
/*
Out of memory check
@@ -197,17 +197,8 @@ void mysql_client_binlog_statement(THD* thd)
}
}
- if (!(rgi= rli->group_info))
- {
- if (!(rgi= rli->group_info= (struct rpl_group_info *)
- my_malloc(sizeof(*rgi), MYF(0))))
- {
- my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*rgi));
- goto end;
- }
- bzero(rgi, sizeof(*rgi));
- }
- rgi->rli= rli;
+ rgi.rli= rli;
+ rgi.thd= thd;
ev= Log_event::read_log_event(bufptr, event_len, &error,
rli->relay_log.description_event_for_exec,
0);
@@ -244,7 +235,7 @@ void mysql_client_binlog_statement(THD* thd)
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
OPTION_SKIP_REPLICATION : 0);
- err= ev->apply_event(rgi);
+ err= ev->apply_event(&rgi);
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 1273bff1750..91f13bebd12 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1442,11 +1442,9 @@ check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
{
bool running;
- mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
running= master_info_index->give_error_if_slave_running();
mysql_mutex_unlock(&LOCK_active_mi);
- mysql_mutex_lock(&LOCK_global_system_variables);
if (running)
return true;
@@ -1457,17 +1455,18 @@ static bool
fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
{
bool running;
+ bool err= false;
mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
running= master_info_index->give_error_if_slave_running();
mysql_mutex_unlock(&LOCK_active_mi);
- mysql_mutex_lock(&LOCK_global_system_variables);
if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool,
opt_slave_parallel_threads))
- return true;
+ err= true;
+ mysql_mutex_lock(&LOCK_global_system_variables);
- return false;
+ return err;
}
@@ -1497,7 +1496,7 @@ static Sys_var_ulong Sys_binlog_commit_wait_count(
static Sys_var_ulong Sys_binlog_commit_wait_usec(
"binlog_commit_wait_usec",
"Maximum time, in microseconds, to wait for more commits to queue up "
- " for binlog group commit. Only takes effect if the value of "
+ "for binlog group commit. Only takes effect if the value of "
"binlog_commit_wait_count is non-zero.",
GLOBAL_VAR(opt_binlog_commit_wait_usec), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(100000), BLOCK_SIZE(1));