summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorEugene Kosov <eugene.kosov@mariadb.com>2019-07-16 15:42:36 +0300
committerEugene Kosov <eugene.kosov@mariadb.com>2019-07-16 18:39:21 +0300
commit0f83c8878dc1389212c134f65d37a43d9d248250 (patch)
tree6950ea9b6c449a6e5d8a0205b3d06ae275a6234c /sql
parentaa96e56c55c44d2c20c1cd70325ef88ad0af8f98 (diff)
parentd2f094d9e63e97293915b17b30a73b2552647a38 (diff)
downloadmariadb-git-0f83c8878dc1389212c134f65d37a43d9d248250.tar.gz
Merge 10.2 into 10.3
Diffstat (limited to 'sql')
-rw-r--r--sql/item_sum.cc11
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/rpl_mi.cc9
-rw-r--r--sql/sp_head.cc2
-rw-r--r--sql/sql_plugin.cc9
-rw-r--r--sql/sql_table.cc1
-rw-r--r--sql/wsrep_mysqld.cc43
-rw-r--r--sql/wsrep_mysqld.h17
-rw-r--r--sql/wsrep_thd.cc36
-rw-r--r--sql/wsrep_var.cc18
10 files changed, 115 insertions, 33 deletions
diff --git a/sql/item_sum.cc b/sql/item_sum.cc
index cb7b9264b3b..111dc88a1e3 100644
--- a/sql/item_sum.cc
+++ b/sql/item_sum.cc
@@ -3089,11 +3089,14 @@ Item_sum_hybrid::min_max_update_str_field()
if (!args[0]->null_value)
{
- result_field->val_str(&cmp->value2);
-
- if (result_field->is_null() ||
- (cmp_sign * sortcmp(res_str,&cmp->value2,collation.collation)) < 0)
+ if (result_field->is_null())
result_field->store(res_str->ptr(),res_str->length(),res_str->charset());
+ else
+ {
+ result_field->val_str(&cmp->value2);
+ if ((cmp_sign * sortcmp(res_str,&cmp->value2,collation.collation)) < 0)
+ result_field->store(res_str->ptr(),res_str->length(),res_str->charset());
+ }
result_field->set_notnull();
}
DBUG_VOID_RETURN;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index f98f51c73d0..c2475bbb46f 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -6136,6 +6136,8 @@ int mysqld_main(int argc, char **argv)
wsrep_init_startup (false);
}
+ WSREP_DEBUG("Startup creating %ld applier threads running %lu",
+ wsrep_slave_threads - 1, wsrep_running_applier_threads);
wsrep_create_appliers(wsrep_slave_threads - 1);
}
}
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 50c5c7b1969..3542933318a 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -115,15 +115,6 @@ void Master_info::wait_until_free()
Master_info::~Master_info()
{
wait_until_free();
-#ifdef WITH_WSREP
- /*
- Do not free "wsrep" rpl_filter. It will eventually be freed by
- free_all_rpl_filters() when server terminates.
- */
- if (strncmp(connection_name.str, STRING_WITH_LEN("wsrep")))
-#endif
- rpl_filters.delete_element(connection_name.str, connection_name.length,
- (void (*)(const char*, uchar*)) free_rpl_filter);
my_free(const_cast<char*>(connection_name.str));
delete_dynamic(&ignore_server_ids);
mysql_mutex_destroy(&run_lock);
diff --git a/sql/sp_head.cc b/sql/sp_head.cc
index 1dfea47a108..458955b2d6b 100644
--- a/sql/sp_head.cc
+++ b/sql/sp_head.cc
@@ -169,7 +169,7 @@ sp_get_flags_for_command(LEX *lex)
switch (lex->sql_command) {
case SQLCOM_SELECT:
- if (lex->result)
+ if (lex->result && !lex->analyze_stmt)
{
flags= 0; /* This is a SELECT with INTO clause */
break;
diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc
index b568c5cb229..c7515fadbcb 100644
--- a/sql/sql_plugin.cc
+++ b/sql/sql_plugin.cc
@@ -76,7 +76,7 @@ uint plugin_maturity_map[]=
{ 0, 1, 2, 3, 4, 5, 6 };
/*
- When you ad a new plugin type, add both a string and make sure that the
+ When you add a new plugin type, add both a string and make sure that the
init and deinit array are correctly updated.
*/
const LEX_CSTRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]=
@@ -3684,7 +3684,7 @@ static int construct_options(MEM_ROOT *mem_root, struct st_plugin_int *tmp,
const LEX_CSTRING plugin_dash = { STRING_WITH_LEN("plugin-") };
size_t plugin_name_len= strlen(plugin_name);
size_t optnamelen;
- const int max_comment_len= 180;
+ const int max_comment_len= 255;
char *comment= (char *) alloc_root(mem_root, max_comment_len + 1);
char *optname;
@@ -3718,8 +3718,9 @@ static int construct_options(MEM_ROOT *mem_root, struct st_plugin_int *tmp,
options[0].typelib= options[1].typelib= &global_plugin_typelib;
strxnmov(comment, max_comment_len, "Enable or disable ", plugin_name,
- " plugin. One of: ON, OFF, FORCE (don't start "
- "if the plugin fails to load).", NullS);
+ " plugin. One of: ON, OFF, FORCE (don't start if the plugin"
+ " fails to load), FORCE_PLUS_PERMANENT (like FORCE, but the"
+ " plugin can not be uninstalled).", NullS);
options[0].comment= comment;
/*
Allocate temporary space for the value of the tristate.
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index cecd3c42f41..6aadbe3d397 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -7554,6 +7554,7 @@ static bool mysql_inplace_alter_table(THD *thd,
if (res)
goto rollback;
+ DEBUG_SYNC(thd, "alter_table_inplace_before_lock_upgrade");
// Upgrade to EXCLUSIVE before commit.
if (wait_while_table_is_used(thd, table, HA_EXTRA_PREPARE_FOR_RENAME))
goto rollback;
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 99cad458827..21e2e5bd93f 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -38,6 +38,7 @@
#include <cstdlib>
#include "log_event.h"
#include <slave.h>
+#include "sql_plugin.h" /* wsrep_plugins_pre_init() */
wsrep_t *wsrep = NULL;
/*
@@ -134,7 +135,11 @@ mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
int wsrep_replaying= 0;
-ulong wsrep_running_threads = 0; // # of currently running wsrep threads
+ulong wsrep_running_threads = 0; // # of currently running wsrep
+ // # threads
+ulong wsrep_running_applier_threads = 0; // # of running applier threads
+ulong wsrep_running_rollbacker_threads = 0; // # of running
+ // # rollbacker threads
ulong my_bind_addr;
#ifdef HAVE_PSI_INTERFACE
@@ -2020,7 +2025,8 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
pthread_handler_t start_wsrep_THD(void *arg)
{
THD *thd;
- wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
+ wsrep_thread_args* args= (wsrep_thread_args*)arg;
+ wsrep_thd_processor_fun processor= args->processor;
if (my_thread_init() || (!(thd= new THD(next_thread_id(), true))))
{
@@ -2096,6 +2102,19 @@ pthread_handler_t start_wsrep_THD(void *arg)
mysql_mutex_lock(&LOCK_thread_count);
wsrep_running_threads++;
+
+ switch (args->thread_type) {
+ case WSREP_APPLIER_THREAD:
+ wsrep_running_applier_threads++;
+ break;
+ case WSREP_ROLLBACKER_THREAD:
+ wsrep_running_rollbacker_threads++;
+ break;
+ default:
+ WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
+ break;
+ }
+
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
@@ -2104,7 +2123,25 @@ pthread_handler_t start_wsrep_THD(void *arg)
close_connection(thd, 0);
mysql_mutex_lock(&LOCK_thread_count);
+ DBUG_ASSERT(wsrep_running_threads > 0);
wsrep_running_threads--;
+
+ switch (args->thread_type) {
+ case WSREP_APPLIER_THREAD:
+ DBUG_ASSERT(wsrep_running_applier_threads > 0);
+ wsrep_running_applier_threads--;
+ break;
+ case WSREP_ROLLBACKER_THREAD:
+ DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
+ wsrep_running_rollbacker_threads--;
+ break;
+ default:
+ WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
+ break;
+ }
+
+ my_free(args);
+
WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
@@ -2133,6 +2170,8 @@ pthread_handler_t start_wsrep_THD(void *arg)
error:
WSREP_ERROR("Failed to create/initialize system thread");
+ my_free(args);
+
/* Abort if its the first applier/rollbacker thread. */
if (!mysqld_server_initialized)
unireg_abort(1);
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index c5749ef6da9..592d2f279f4 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -31,6 +31,7 @@ typedef struct st_mysql_show_var SHOW_VAR;
#include "mysqld.h"
#include "sql_table.h"
#include "wsrep_mysqld_c.h"
+#include <vector>
#define WSREP_UNDEFINED_TRX_ID ULONGLONG_MAX
@@ -89,6 +90,8 @@ extern my_bool wsrep_restart_slave_activated;
extern my_bool wsrep_slave_FK_checks;
extern my_bool wsrep_slave_UK_checks;
extern ulong wsrep_running_threads;
+extern ulong wsrep_running_applier_threads;
+extern ulong wsrep_running_rollbacker_threads;
extern bool wsrep_new_cluster;
extern bool wsrep_gtid_mode;
extern uint32 wsrep_gtid_domain_id;
@@ -305,7 +308,21 @@ void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end);
void thd_binlog_rollback_stmt(THD * thd);
void thd_binlog_trx_reset(THD * thd);
+enum wsrep_thread_type {
+ WSREP_APPLIER_THREAD=1,
+ WSREP_ROLLBACKER_THREAD=2
+};
+
typedef void (*wsrep_thd_processor_fun)(THD *);
+
+typedef struct {
+ pthread_t thread_id;
+ wsrep_thd_processor_fun processor;
+ enum wsrep_thread_type thread_type;
+} wsrep_thread_args;
+
+extern std::vector<wsrep_thread_args*> wsrep_thread_arg;
+
pthread_handler_t start_wsrep_THD(void *arg);
int wsrep_wait_committing_connections_close(int wait_time);
extern void wsrep_close_client_connections(my_bool wait_to_end,
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index 177124cb0ce..34b39c6d1e3 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -359,6 +359,7 @@ static void wsrep_replication_process(THD *thd)
thd->variables.option_bits|= OPTION_BEGIN;
thd->server_status|= SERVER_STATUS_IN_TRANS;
+ thd_proc_info(thd, "wsrep applier idle");
rcode = wsrep->recv(wsrep, (void *)thd);
DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
@@ -414,13 +415,12 @@ static void wsrep_replication_process(THD *thd)
DBUG_VOID_RETURN;
}
-static bool create_wsrep_THD(wsrep_thd_processor_fun processor)
+static bool create_wsrep_THD(wsrep_thread_args* args)
{
ulong old_wsrep_running_threads= wsrep_running_threads;
- pthread_t unused;
mysql_mutex_lock(&LOCK_thread_count);
- bool res= pthread_create(&unused, &connection_attrib, start_wsrep_THD,
- (void*)processor);
+ bool res= pthread_create(&args->thread_id, &connection_attrib, start_wsrep_THD,
+ (void*)args);
/*
if starting a thread on server startup, wait until the this thread's THD
is fully initialized (otherwise a THD initialization code might
@@ -450,8 +450,20 @@ void wsrep_create_appliers(long threads)
long wsrep_threads=0;
while (wsrep_threads++ < threads) {
- if (create_wsrep_THD(wsrep_replication_process))
+ wsrep_thread_args* arg;
+ if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
+ WSREP_ERROR("Can't allocate memory for wsrep replication thread %ld\n", wsrep_threads);
+ assert(0);
+ }
+
+ arg->thread_type = WSREP_APPLIER_THREAD;
+ arg->processor = wsrep_replication_process;
+
+ if (create_wsrep_THD(arg)) {
WSREP_WARN("Can't create thread to manage wsrep replication");
+ my_free(arg);
+ return;
+ }
}
}
@@ -538,9 +550,21 @@ void wsrep_create_rollbacker()
{
if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
{
+ wsrep_thread_args* arg;
+ if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
+ WSREP_ERROR("Can't allocate memory for wsrep rollbacker thread\n");
+ assert(0);
+ }
+
+ arg->thread_type = WSREP_ROLLBACKER_THREAD;
+ arg->processor = wsrep_rollback_process;
+
/* create rollbacker */
- if (create_wsrep_THD(wsrep_rollback_process))
+ if (create_wsrep_THD(arg)) {
WSREP_WARN("Can't create thread to manage wsrep rollback");
+ my_free(arg);
+ return;
+ }
}
}
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index ab20499360e..258c00b9f88 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -28,8 +28,6 @@
ulong wsrep_reject_queries;
-static long wsrep_prev_slave_threads = wsrep_slave_threads;
-
int wsrep_init_vars()
{
wsrep_provider = my_strdup(WSREP_NONE, MYF(MY_WME));
@@ -502,6 +500,8 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type)
if (wsrep_start_replication())
{
wsrep_create_rollbacker();
+ WSREP_DEBUG("Cluster address update creating %ld applier threads running %lu",
+ wsrep_slave_threads, wsrep_running_applier_threads);
wsrep_create_appliers(wsrep_slave_threads);
}
@@ -595,18 +595,20 @@ void wsrep_node_address_init (const char* value)
static void wsrep_slave_count_change_update ()
{
- wsrep_slave_count_change = (wsrep_slave_threads - wsrep_prev_slave_threads);
- WSREP_DEBUG("Change on slave threads: New %lu old %lu difference %d",
- wsrep_slave_threads, wsrep_prev_slave_threads, wsrep_slave_count_change);
- wsrep_prev_slave_threads = wsrep_slave_threads;
+ wsrep_slave_count_change = (wsrep_slave_threads - wsrep_running_applier_threads);
+ WSREP_DEBUG("Change on slave threads: New %ld old %lu difference %d",
+ wsrep_slave_threads, wsrep_running_applier_threads, wsrep_slave_count_change);
}
bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type)
{
wsrep_slave_count_change_update();
+
if (wsrep_slave_count_change > 0)
{
+ WSREP_DEBUG("Creating %d applier threads, total %ld", wsrep_slave_count_change, wsrep_slave_threads);
wsrep_create_appliers(wsrep_slave_count_change);
+ WSREP_DEBUG("Running %lu applier threads", wsrep_running_applier_threads);
wsrep_slave_count_change = 0;
}
return false;
@@ -706,7 +708,9 @@ static SHOW_VAR wsrep_status_vars[]=
{"provider_name", (char*) &wsrep_provider_name, SHOW_CHAR_PTR},
{"provider_version", (char*) &wsrep_provider_version, SHOW_CHAR_PTR},
{"provider_vendor", (char*) &wsrep_provider_vendor, SHOW_CHAR_PTR},
- {"thread_count", (char*) &wsrep_running_threads, SHOW_LONG_NOFLUSH}
+ {"thread_count", (char*) &wsrep_running_threads, SHOW_LONG_NOFLUSH},
+ {"applier_thread_count", (char*)&wsrep_running_applier_threads, SHOW_LONG_NOFLUSH},
+ {"rollbacker_thread_count", (char *)&wsrep_running_rollbacker_threads, SHOW_LONG_NOFLUSH},
};
static int show_var_cmp(const void *var1, const void *var2)