summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/item_cmpfunc.cc2
-rw-r--r--sql/lex.h1
-rw-r--r--sql/log.cc217
-rw-r--r--sql/log.h7
-rw-r--r--sql/mysqld.cc3
-rw-r--r--sql/partition_info.cc3
-rw-r--r--sql/rpl_gtid.cc151
-rw-r--r--sql/rpl_gtid.h9
-rw-r--r--sql/share/errmsg-utf8.txt6
-rw-r--r--sql/sp_head.cc42
-rw-r--r--sql/sql_class.cc7
-rw-r--r--sql/sql_lex.cc5
-rw-r--r--sql/sql_lex.h7
-rw-r--r--sql/sql_parse.cc18
-rw-r--r--sql/sql_reload.cc5
-rw-r--r--sql/sql_repl.cc68
-rw-r--r--sql/sql_repl.h1
-rw-r--r--sql/sql_show.cc5
-rw-r--r--sql/sql_yacc.yy22
-rw-r--r--sql/sql_yacc_ora.yy1
-rw-r--r--sql/threadpool_generic.cc129
-rw-r--r--sql/wsrep_hton.cc3
-rw-r--r--sql/wsrep_mysqld.cc2
-rw-r--r--sql/wsrep_var.cc3
24 files changed, 551 insertions, 166 deletions
diff --git a/sql/item_cmpfunc.cc b/sql/item_cmpfunc.cc
index e4b8153507a..587dc14c529 100644
--- a/sql/item_cmpfunc.cc
+++ b/sql/item_cmpfunc.cc
@@ -5465,7 +5465,7 @@ void Regexp_processor_pcre::pcre_exec_warn(int rc) const
switch (rc)
{
case PCRE_ERROR_NULL:
- errmsg= "pcre_exec: null arguement passed";
+ errmsg= "pcre_exec: null argument passed";
break;
case PCRE_ERROR_BADOPTION:
errmsg= "pcre_exec: bad option";
diff --git a/sql/lex.h b/sql/lex.h
index 67c3bc8620d..ec2ca7de564 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -181,6 +181,7 @@ static SYMBOL symbols[] = {
{ "DELAYED", SYM(DELAYED_SYM)},
{ "DELAY_KEY_WRITE", SYM(DELAY_KEY_WRITE_SYM)},
{ "DELETE", SYM(DELETE_SYM)},
+ { "DELETE_DOMAIN_ID", SYM(DELETE_DOMAIN_ID_SYM)},
{ "DESC", SYM(DESC)},
{ "DESCRIBE", SYM(DESCRIBE)},
{ "DES_KEY_FILE", SYM(DES_KEY_FILE)},
diff --git a/sql/log.cc b/sql/log.cc
index 84094c4b730..34533b23ac5 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6656,6 +6656,120 @@ void MYSQL_BIN_LOG::checkpoint_and_purge(ulong binlog_id)
purge();
}
+
+/**
+ Searches for the first (oldest) binlog file name in in the binlog index.
+
+ @param[in,out] buf_arg pointer to a buffer to hold found
+ the first binary log file name
+ @return NULL on success, otherwise error message
+*/
+static const char* get_first_binlog(char* buf_arg)
+{
+ IO_CACHE *index_file;
+ size_t length;
+ char fname[FN_REFLEN];
+ const char* errmsg= NULL;
+
+ DBUG_ENTER("get_first_binlog");
+
+ DBUG_ASSERT(mysql_bin_log.is_open());
+
+ mysql_bin_log.lock_index();
+
+ index_file=mysql_bin_log.get_index_file();
+ if (reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0))
+ {
+ errmsg= "failed to create a cache on binlog index";
+ goto end;
+ }
+ /* The file ends with EOF or empty line */
+ if ((length=my_b_gets(index_file, fname, sizeof(fname))) <= 1)
+ {
+ errmsg= "empty binlog index";
+ goto end;
+ }
+ else
+ {
+ fname[length-1]= 0; // Remove end \n
+ }
+ if (normalize_binlog_name(buf_arg, fname, false))
+ {
+ errmsg= "cound not normalize the first file name in the binlog index";
+ goto end;
+ }
+end:
+ mysql_bin_log.unlock_index();
+
+ DBUG_RETURN(errmsg);
+}
+
+/**
+ Check weather the gtid binlog state can safely remove gtid
+ domains passed as the argument. A safety condition is satisfied when
+ there are no events from the being deleted domains in the currently existing
+ binlog files. Upon successful check the supplied domains are removed
+ from @@gtid_binlog_state. The caller is supposed to rotate binlog so that
+ the active latest file won't have the deleted domains in its Gtid_list header.
+
+ @param domain_drop_lex gtid domain id sequence from lex.
+ Passed as a pointer to dynamic array must be not empty
+ unless pointer value NULL.
+ @retval zero on success
+ @retval > 0 ineffective call none from the *non* empty
+ gtid domain sequence is deleted
+ @retval < 0 on error
+*/
+static int do_delete_gtid_domain(DYNAMIC_ARRAY *domain_drop_lex)
+{
+ int rc= 0;
+ Gtid_list_log_event *glev= NULL;
+ char buf[FN_REFLEN];
+ File file;
+ IO_CACHE cache;
+ const char* errmsg= NULL;
+ char errbuf[MYSQL_ERRMSG_SIZE]= {0};
+
+ if (!domain_drop_lex)
+ return 0; // still "effective" having empty domain sequence to delete
+
+ DBUG_ASSERT(domain_drop_lex->elements > 0);
+ mysql_mutex_assert_owner(mysql_bin_log.get_log_lock());
+
+ if ((errmsg= get_first_binlog(buf)) != NULL)
+ goto end;
+ bzero((char*) &cache, sizeof(cache));
+ if ((file= open_binlog(&cache, buf, &errmsg)) == (File) -1)
+ goto end;
+ errmsg= get_gtid_list_event(&cache, &glev);
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+
+ DBUG_EXECUTE_IF("inject_binlog_delete_domain_init_error",
+ errmsg= "injected error";);
+ if (errmsg)
+ goto end;
+ errmsg= rpl_global_gtid_binlog_state.drop_domain(domain_drop_lex,
+ glev, errbuf);
+
+end:
+ if (errmsg)
+ {
+ if (strlen(errmsg) > 0)
+ {
+ my_error(ER_BINLOG_CANT_DELETE_GTID_DOMAIN, MYF(0), errmsg);
+ rc= -1;
+ }
+ else
+ {
+ rc= 1;
+ }
+ }
+ delete glev;
+
+ return rc;
+}
+
/**
The method is a shortcut of @c rotate() and @c purge().
LOCK_log is acquired prior to rotate and is released after it.
@@ -6665,16 +6779,24 @@ void MYSQL_BIN_LOG::checkpoint_and_purge(ulong binlog_id)
@retval
nonzero - error in rotating routine.
*/
-int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate)
+int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate,
+ DYNAMIC_ARRAY *domain_drop_lex)
{
- int error= 0;
+ int err_gtid=0, error= 0;
ulong prev_binlog_id;
DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge");
bool check_purge= false;
mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id;
- if ((error= rotate(force_rotate, &check_purge)))
+
+ if ((err_gtid= do_delete_gtid_domain(domain_drop_lex)))
+ {
+ // inffective attempt to delete merely skips rotate and purge
+ if (err_gtid < 0)
+ error= 1; // otherwise error is propagated the user
+ }
+ else if ((error= rotate(force_rotate, &check_purge)))
check_purge= false;
/*
NOTE: Run purge_logs wo/ holding LOCK_log because it does not need
@@ -7078,8 +7200,15 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
mode. Also, do not write the cached updates to binlog if binary logging is
disabled (log-bin/sql_log_bin).
*/
- if (wsrep_emulate_bin_log || !(thd->variables.option_bits & OPTION_BIN_LOG))
+ if (wsrep_emulate_bin_log)
+ {
DBUG_RETURN(0);
+ }
+ else if (!(thd->variables.option_bits & OPTION_BIN_LOG))
+ {
+ cache_mngr->need_unlog= false;
+ DBUG_RETURN(0);
+ }
entry.thd= thd;
entry.cache_mngr= cache_mngr;
@@ -9402,11 +9531,19 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
if (err)
DBUG_RETURN(0);
+
+ bool need_unlog= cache_mngr->need_unlog;
+ /*
+ The transaction won't need the flag anymore.
+ Todo/fixme: consider to move the statement into cache_mngr->reset()
+ relocated to the current or later point.
+ */
+ cache_mngr->need_unlog= false;
/*
If using explicit user XA, we will not have XID. We must still return a
non-zero cookie (as zero cookie signals error).
*/
- if (!xid || !cache_mngr->need_unlog)
+ if (!xid || !need_unlog)
DBUG_RETURN(BINLOG_COOKIE_DUMMY(cache_mngr->delayed_error));
else
DBUG_RETURN(BINLOG_COOKIE_MAKE(cache_mngr->binlog_id,
@@ -9479,6 +9616,9 @@ TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint)
if (b->binlog_id == binlog_id)
{
--b->xid_count;
+
+ DBUG_ASSERT(b->xid_count >= 0); // catch unmatched (++) decrement
+
break;
}
first= false;
@@ -10252,6 +10392,73 @@ TC_LOG_BINLOG::set_status_variables(THD *thd)
}
}
+
+/*
+ Find the Gtid_list_log_event at the start of a binlog.
+
+ NULL for ok, non-NULL error message for error.
+
+ If ok, then the event is returned in *out_gtid_list. This can be NULL if we
+ get back to binlogs written by old server version without GTID support. If
+ so, it means we have reached the point to start from, as no GTID events can
+ exist in earlier binlogs.
+*/
+const char *
+get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
+{
+ Format_description_log_event init_fdle(BINLOG_VERSION);
+ Format_description_log_event *fdle;
+ Log_event *ev;
+ const char *errormsg = NULL;
+
+ *out_gtid_list= NULL;
+
+ if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
+ opt_master_verify_checksum)) ||
+ ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ {
+ if (ev)
+ delete ev;
+ return "Could not read format description log event while looking for "
+ "GTID position in binlog";
+ }
+
+ fdle= static_cast<Format_description_log_event *>(ev);
+
+ for (;;)
+ {
+ Log_event_type typ;
+
+ ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
+ if (!ev)
+ {
+ errormsg= "Could not read GTID list event while looking for GTID "
+ "position in binlog";
+ break;
+ }
+ typ= ev->get_type_code();
+ if (typ == GTID_LIST_EVENT)
+ break; /* Done, found it */
+ if (typ == START_ENCRYPTION_EVENT)
+ {
+ if (fdle->start_decryption((Start_encryption_log_event*) ev))
+ errormsg= "Could not set up decryption for binlog.";
+ }
+ delete ev;
+ if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+ typ == FORMAT_DESCRIPTION_EVENT || typ == START_ENCRYPTION_EVENT)
+ continue; /* Continue looking */
+
+ /* We did not find any Gtid_list_log_event, must be old binlog. */
+ ev= NULL;
+ break;
+ }
+
+ delete fdle;
+ *out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
+ return errormsg;
+}
+
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
diff --git a/sql/log.h b/sql/log.h
index eaa63d4072d..e2abcea9fdf 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -760,7 +760,7 @@ public:
int update_log_index(LOG_INFO* linfo, bool need_update_threads);
int rotate(bool force_rotate, bool* check_purge);
void checkpoint_and_purge(ulong binlog_id);
- int rotate_and_purge(bool force_rotate);
+ int rotate_and_purge(bool force_rotate, DYNAMIC_ARRAY* drop_gtid_domain= NULL);
/**
Flush binlog cache and synchronize to disk.
@@ -1169,4 +1169,9 @@ static inline TC_LOG *get_tc_log_implementation()
return &tc_log_mmap;
}
+
+class Gtid_list_log_event;
+const char *
+get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list);
+
#endif /* LOG_H */
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index c91529efb92..374a988537f 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -5885,9 +5885,6 @@ int mysqld_main(int argc, char **argv)
#ifdef __WIN__
if (!opt_console)
{
- if (reopen_fstreams(log_error_file, stdout, stderr))
- unireg_abort(1);
- setbuf(stderr, NULL);
FreeConsole(); // Remove window
}
diff --git a/sql/partition_info.cc b/sql/partition_info.cc
index 47fb60ea12a..1224c1bf9f3 100644
--- a/sql/partition_info.cc
+++ b/sql/partition_info.cc
@@ -42,13 +42,12 @@ partition_info *partition_info::get_clone(THD *thd)
List_iterator<partition_element> part_it(partitions);
partition_element *part;
- partition_info *clone= new (mem_root) partition_info();
+ partition_info *clone= new (mem_root) partition_info(*this);
if (!clone)
{
mem_alloc_error(sizeof(partition_info));
DBUG_RETURN(NULL);
}
- memcpy(clone, this, sizeof(partition_info));
memset(&(clone->read_partitions), 0, sizeof(clone->read_partitions));
memset(&(clone->lock_partitions), 0, sizeof(clone->lock_partitions));
clone->bitmaps_are_initialized= FALSE;
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index c385434e41e..7f7e53c79e4 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -26,7 +26,7 @@
#include "key.h"
#include "rpl_gtid.h"
#include "rpl_rli.h"
-
+#include "log_event.h"
const LEX_STRING rpl_gtid_slave_state_table_name=
{ C_STRING_WITH_LEN("gtid_slave_pos") };
@@ -1727,6 +1727,155 @@ end:
return res;
}
+/**
+ Remove domains supplied by the first argument from binlog state.
+ Removal is done for any domain whose last gtids (from all its servers) match
+ ones in Gtid list event of the 2nd argument.
+
+ @param ids gtid domain id sequence, may contain dups
+ @param glev pointer to Gtid list event describing
+ the match condition
+ @param errbuf [out] pointer to possible error message array
+
+ @retval NULL as success when at least one domain is removed
+ @retval "" empty string to indicate ineffective call
+ when no domains removed
+ @retval NOT EMPTY string otherwise an error message
+*/
+const char*
+rpl_binlog_state::drop_domain(DYNAMIC_ARRAY *ids,
+ Gtid_list_log_event *glev,
+ char* errbuf)
+{
+ DYNAMIC_ARRAY domain_unique; // sequece (unsorted) of unique element*:s
+ rpl_binlog_state::element* domain_unique_buffer[16];
+ ulong k, l;
+ const char* errmsg= NULL;
+
+ DBUG_ENTER("rpl_binlog_state::drop_domain");
+
+ my_init_dynamic_array2(&domain_unique,
+ sizeof(element*), domain_unique_buffer,
+ sizeof(domain_unique_buffer) / sizeof(element*), 4, 0);
+
+ mysql_mutex_lock(&LOCK_binlog_state);
+
+ /*
+ Gtid list is supposed to come from a binlog's Gtid_list event and
+ therefore should be a subset of the current binlog state. That is
+ for every domain in the list the binlog state contains a gtid with
+ sequence number not less than that of the list.
+ Exceptions of this inclusion rule are:
+ A. the list may still refer to gtids from already deleted domains.
+ Files containing them must have been purged whereas the file
+ with the list is not yet.
+ B. out of order groups were injected
+ C. manually build list of binlog files violating the inclusion
+ constraint.
+ While A is a normal case (not necessarily distinguishable from C though),
+ B and C may require the user's attention so any (incl the A's suspected)
+ inconsistency is diagnosed and *warned*.
+ */
+ for (l= 0, errbuf[0]= 0; l < glev->count; l++, errbuf[0]= 0)
+ {
+ rpl_gtid* rb_state_gtid= find_nolock(glev->list[l].domain_id,
+ glev->list[l].server_id);
+ if (!rb_state_gtid)
+ sprintf(errbuf,
+ "missing gtids from the '%u-%u' domain-server pair which is "
+ "referred to in the gtid list describing an earlier state. Ignore "
+ "if the domain ('%u') was already explicitly deleted",
+ glev->list[l].domain_id, glev->list[l].server_id,
+ glev->list[l].domain_id);
+ else if (rb_state_gtid->seq_no < glev->list[l].seq_no)
+ sprintf(errbuf,
+ "having a gtid '%u-%u-%llu' which is less than "
+ "the '%u-%u-%llu' of the gtid list describing an earlier state. "
+ "The state may have been affected by manually injecting "
+ "a lower sequence number gtid or via replication",
+ rb_state_gtid->domain_id, rb_state_gtid->server_id,
+ rb_state_gtid->seq_no, glev->list[l].domain_id,
+ glev->list[l].server_id, glev->list[l].seq_no);
+ if (strlen(errbuf)) // use strlen() as cheap flag
+ push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
+ "The current gtid binlog state is incompatible with "
+ "a former one %s.", errbuf);
+ }
+
+ /*
+ For each domain_id from ids
+ when no such domain in binlog state
+ warn && continue
+ For each domain.server's last gtid
+ when not locate the last gtid in glev.list
+ error out binlog state can't change
+ otherwise continue
+ */
+ for (ulong i= 0; i < ids->elements; i++)
+ {
+ rpl_binlog_state::element *elem= NULL;
+ ulong *ptr_domain_id;
+ bool not_match;
+
+ ptr_domain_id= (ulong*) dynamic_array_ptr(ids, i);
+ elem= (rpl_binlog_state::element *)
+ my_hash_search(&hash, (const uchar *) ptr_domain_id, 0);
+ if (!elem)
+ {
+ push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_BINLOG_CANT_DELETE_GTID_DOMAIN,
+ "The gtid domain being deleted ('%lu') is not in "
+ "the current binlog state", *ptr_domain_id);
+ continue;
+ }
+
+ for (not_match= true, k= 0; k < elem->hash.records; k++)
+ {
+ rpl_gtid *d_gtid= (rpl_gtid *)my_hash_element(&elem->hash, k);
+ for (ulong l= 0; l < glev->count && not_match; l++)
+ not_match= !(*d_gtid == glev->list[l]);
+ }
+
+ if (not_match)
+ {
+ sprintf(errbuf, "binlog files may contain gtids from the domain ('%lu') "
+ "being deleted. Make sure to first purge those files",
+ *ptr_domain_id);
+ errmsg= errbuf;
+ goto end;
+ }
+ // compose a sequence of unique pointers to domain object
+ for (k= 0; k < domain_unique.elements; k++)
+ {
+ if ((rpl_binlog_state::element*) dynamic_array_ptr(&domain_unique, k)
+ == elem)
+ break; // domain_id's elem has been already in
+ }
+ if (k == domain_unique.elements) // proven not to have duplicates
+ insert_dynamic(&domain_unique, (uchar*) &elem);
+ }
+
+ // Domain removal from binlog state
+ for (k= 0; k < domain_unique.elements; k++)
+ {
+ rpl_binlog_state::element *elem= *(rpl_binlog_state::element**)
+ dynamic_array_ptr(&domain_unique, k);
+ my_hash_free(&elem->hash);
+ my_hash_delete(&hash, (uchar*) elem);
+ }
+
+ DBUG_ASSERT(strlen(errbuf) == 0);
+
+ if (domain_unique.elements == 0)
+ errmsg= "";
+
+end:
+ mysql_mutex_unlock(&LOCK_binlog_state);
+ delete_dynamic(&domain_unique);
+
+ DBUG_RETURN(errmsg);
+}
slave_connection_state::slave_connection_state()
{
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 5dfac7a3c6f..19ff0f3f977 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -34,6 +34,13 @@ struct rpl_gtid
uint64 seq_no;
};
+inline bool operator==(const rpl_gtid& lhs, const rpl_gtid& rhs)
+{
+ return
+ lhs.domain_id == rhs.domain_id &&
+ lhs.server_id == rhs.server_id &&
+ lhs.seq_no == rhs.seq_no;
+};
enum enum_gtid_skip_type {
GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
@@ -93,6 +100,7 @@ struct gtid_waiting {
class Relay_log_info;
struct rpl_group_info;
+class Gtid_list_log_event;
/*
Replication slave state.
@@ -257,6 +265,7 @@ struct rpl_binlog_state
rpl_gtid *find_nolock(uint32 domain_id, uint32 server_id);
rpl_gtid *find(uint32 domain_id, uint32 server_id);
rpl_gtid *find_most_recent(uint32 domain_id);
+ const char* drop_domain(DYNAMIC_ARRAY *ids, Gtid_list_log_event *glev, char*);
};
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 63441ade8b7..01f348e86ab 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -1801,8 +1801,8 @@ ER_WRONG_AUTO_KEY 42000 S1009
spa "Puede ser solamente un campo automatico y este debe ser definido como una clave"
swe "Det får finnas endast ett AUTO_INCREMENT-fält och detta måste vara en nyckel"
ukr "Невірне визначення таблиці; Може бути лише один автоматичний стовбець, що повинен бути визначений як ключ"
-ER_UNUSED_9
- eng "You should never see it"
+ER_BINLOG_CANT_DELETE_GTID_DOMAIN
+ eng "Could not delete gtid domain. Reason: %s."
ER_NORMAL_SHUTDOWN
cze "%s (%s): normální ukončení\n"
dan "%s (%s): Normal nedlukning\n"
@@ -7330,7 +7330,7 @@ ER_SUBQUERIES_NOT_SUPPORTED 42000
eng "%s does not support subqueries or stored functions"
ER_SET_STATEMENT_NOT_SUPPORTED 42000
eng "The system variable %.200s cannot be set in SET STATEMENT."
-ER_UNUSED_17
+ER_UNUSED_9
eng "You should never see it"
ER_USER_CREATE_EXISTS
eng "Can't create user '%-.64s'@'%-.64s'; it already exists"
diff --git a/sql/sp_head.cc b/sql/sp_head.cc
index 7e7ad300e7c..4e247859d83 100644
--- a/sql/sp_head.cc
+++ b/sql/sp_head.cc
@@ -1022,6 +1022,19 @@ sp_head::execute(THD *thd, bool merge_da_on_success)
if (check_stack_overrun(thd, 7 * STACK_MIN_SIZE, (uchar*)&old_packet))
DBUG_RETURN(TRUE);
+ /*
+ Normally the counter is not reset between parsing and first execution,
+ but it is possible in case of error to have parsing on one CALL and
+ first execution (where VIEW will be parsed and added). So we store the
+ counter after parsing and restore it before execution just to avoid
+ repeating SELECT numbers.
+
+ Other problem is that it can be more SELECTs parsed in case of fixing
+ error causes previous interruption of the SP. So it is save not just
+ assign old value but add it.
+ */
+ thd->select_number+= m_select_number;
+
/* init per-instruction memroot */
init_sql_alloc(&execute_mem_root, MEM_ROOT_BLOCK_SIZE, 0, MYF(0));
@@ -1361,6 +1374,16 @@ sp_head::execute(THD *thd, bool merge_da_on_success)
m_recursion_level + 1));
m_first_instance->m_first_free_instance= this;
+ /*
+ This execution of the SP was aborted with an error (e.g. "Table not
+ found"). However it might still have consumed some numbers from the
+ thd->select_number counter. The next sp->exec() call must not use the
+ consumed numbers, so we remember the first free number (We know that
+ nobody will use it as this execution has stopped with an error).
+ */
+ if (err_status)
+ set_select_number(thd->select_number);
+
DBUG_RETURN(err_status);
}
@@ -2046,26 +2069,7 @@ sp_head::execute_procedure(THD *thd, List<Item> *args)
if (!err_status)
{
- /*
- Normally the counter is not reset between parsing and first execution,
- but it is possible in case of error to have parsing on one CALL and
- first execution (where VIEW will be parsed and added). So we store the
- counter after parsing and restore it before execution just to avoid
- repeating SELECT numbers.
- */
- thd->select_number= m_select_number;
-
err_status= execute(thd, TRUE);
- DBUG_PRINT("info", ("execute returned %d", (int) err_status));
- /*
- This execution of the SP was aborted with an error (e.g. "Table not
- found"). However it might still have consumed some numbers from the
- thd->select_number counter. The next sp->exec() call must not use the
- consumed numbers, so we remember the first free number (We know that
- nobody will use it as this execution has stopped with an error).
- */
- if (err_status)
- set_select_number(thd->select_number);
}
if (save_log_general)
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index d6ba7623cf5..7af1170d107 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5004,17 +5004,14 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
extern "C" int thd_binlog_format(const MYSQL_THD thd)
{
-#ifdef WITH_WSREP
if (WSREP(thd))
{
/* for wsrep binlog format is meaningful also when binlogging is off */
- return (int) WSREP_BINLOG_FORMAT(thd->variables.binlog_format);
+ return (int) thd->wsrep_binlog_format();
}
-#endif /* WITH_WSREP */
if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG))
return (int) thd->variables.binlog_format;
- else
- return BINLOG_FORMAT_UNSPEC;
+ return BINLOG_FORMAT_UNSPEC;
}
extern "C" void thd_mark_transaction_to_rollback(MYSQL_THD thd, bool all)
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index fa65589135d..96d61655e90 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -828,6 +828,7 @@ void lex_end_stage2(LEX *lex)
/* Reset LEX_MASTER_INFO */
lex->mi.reset(lex->sql_command == SQLCOM_CHANGE_MASTER);
+ delete_dynamic(&lex->delete_gtid_domain);
DBUG_VOID_RETURN;
}
@@ -3034,6 +3035,10 @@ LEX::LEX()
INITIAL_LEX_PLUGIN_LIST_SIZE, 0);
reset_query_tables_list(TRUE);
mi.init();
+ init_dynamic_array2(&delete_gtid_domain, sizeof(ulong*),
+ gtid_domain_static_buffer,
+ initial_gtid_domain_buffer_size,
+ initial_gtid_domain_buffer_size, 0);
}
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index c360fe4e3b1..9b50d28e58d 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -2948,6 +2948,13 @@ public:
*/
Item *limit_rows_examined;
ulonglong limit_rows_examined_cnt;
+ /**
+ Holds a set of domain_ids for deletion at FLUSH..DELETE_DOMAIN_ID
+ */
+ DYNAMIC_ARRAY delete_gtid_domain;
+ static const ulong initial_gtid_domain_buffer_size= 16;
+ ulong gtid_domain_static_buffer[initial_gtid_domain_buffer_size];
+
inline void set_limit_rows_examined()
{
if (limit_rows_examined)
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index a613ad6c969..8b1b20f5857 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -6257,6 +6257,24 @@ finish:
THD_STAGE_INFO(thd, stage_rollback);
trans_rollback_stmt(thd);
}
+#ifdef WITH_WSREP
+ else if (thd->spcont &&
+ (thd->wsrep_conflict_state == MUST_ABORT ||
+ thd->wsrep_conflict_state == CERT_FAILURE))
+ {
+ /*
+ The error was cleared, but THD was aborted by wsrep and
+ wsrep_conflict_state is still set accordingly. This
+ situation is expected if we are running a stored procedure
+ that declares a handler that catches ER_LOCK_DEADLOCK error.
+ In which case the error may have been cleared in method
+ sp_rcontext::handle_sql_condition().
+ */
+ trans_rollback_stmt(thd);
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ thd->killed= NOT_KILLED;
+ }
+#endif /* WITH_WSREP */
else
{
/* If commit fails, we should be able to reset the OK status. */
diff --git a/sql/sql_reload.cc b/sql/sql_reload.cc
index c01ad90f5d2..79968c029ea 100644
--- a/sql/sql_reload.cc
+++ b/sql/sql_reload.cc
@@ -153,7 +153,10 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options,
tmp_write_to_binlog= 0;
if (mysql_bin_log.is_open())
{
- if (mysql_bin_log.rotate_and_purge(true))
+ DYNAMIC_ARRAY *drop_gtid_domain=
+ (thd && (thd->lex->delete_gtid_domain.elements > 0)) ?
+ &thd->lex->delete_gtid_domain : NULL;
+ if (mysql_bin_log.rotate_and_purge(true, drop_gtid_domain))
*write_to_binlog= -1;
if (WSREP_ON)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index c7663e26750..7586e8837d0 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -30,7 +30,7 @@
#include <my_dir.h>
#include "rpl_handler.h"
#include "debug_sync.h"
-
+#include "log.h" // get_gtid_list_event
enum enum_gtid_until_state {
GTID_UNTIL_NOT_DONE,
@@ -875,72 +875,6 @@ get_binlog_list(MEM_ROOT *memroot)
DBUG_RETURN(current_list);
}
-/*
- Find the Gtid_list_log_event at the start of a binlog.
-
- NULL for ok, non-NULL error message for error.
-
- If ok, then the event is returned in *out_gtid_list. This can be NULL if we
- get back to binlogs written by old server version without GTID support. If
- so, it means we have reached the point to start from, as no GTID events can
- exist in earlier binlogs.
-*/
-static const char *
-get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
-{
- Format_description_log_event init_fdle(BINLOG_VERSION);
- Format_description_log_event *fdle;
- Log_event *ev;
- const char *errormsg = NULL;
-
- *out_gtid_list= NULL;
-
- if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
- opt_master_verify_checksum)) ||
- ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
- {
- if (ev)
- delete ev;
- return "Could not read format description log event while looking for "
- "GTID position in binlog";
- }
-
- fdle= static_cast<Format_description_log_event *>(ev);
-
- for (;;)
- {
- Log_event_type typ;
-
- ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
- if (!ev)
- {
- errormsg= "Could not read GTID list event while looking for GTID "
- "position in binlog";
- break;
- }
- typ= ev->get_type_code();
- if (typ == GTID_LIST_EVENT)
- break; /* Done, found it */
- if (typ == START_ENCRYPTION_EVENT)
- {
- if (fdle->start_decryption((Start_encryption_log_event*) ev))
- errormsg= "Could not set up decryption for binlog.";
- }
- delete ev;
- if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
- typ == FORMAT_DESCRIPTION_EVENT || typ == START_ENCRYPTION_EVENT)
- continue; /* Continue looking */
-
- /* We did not find any Gtid_list_log_event, must be old binlog. */
- ev= NULL;
- break;
- }
-
- delete fdle;
- *out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
- return errormsg;
-}
-
/*
Check if every GTID requested by the slave is contained in this (or a later)
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 4105bdddf4e..8d9a127bca7 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -81,7 +81,6 @@ int rpl_append_gtid_state(String *dest, bool use_binlog);
int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
bool rpl_gtid_pos_check(THD *thd, char *str, size_t len);
bool rpl_gtid_pos_update(THD *thd, char *str, size_t len);
-
#else
struct LOAD_FILE_IO_CACHE : public IO_CACHE { };
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index aeb9402face..30cf490c10c 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -5190,12 +5190,13 @@ static int get_schema_tables_record(THD *thd, TABLE_LIST *tables,
if (share->tmp_table == SYSTEM_TMP_TABLE)
table->field[3]->store(STRING_WITH_LEN("SYSTEM VIEW"), cs);
- else if (share->tmp_table)
- table->field[3]->store(STRING_WITH_LEN("LOCAL TEMPORARY"), cs);
else if (share->table_type == TABLE_TYPE_SEQUENCE)
table->field[3]->store(STRING_WITH_LEN("SEQUENCE"), cs);
else
+ {
+ DBUG_ASSERT(share->tmp_table == NO_TMP_TABLE);
table->field[3]->store(STRING_WITH_LEN("BASE TABLE"), cs);
+ }
for (int i= 4; i < 20; i++)
{
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 6f94afae3de..49a1afbdee6 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -1027,6 +1027,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token DEFINER_SYM
%token DELAYED_SYM
%token DELAY_KEY_WRITE_SYM
+%token DELETE_DOMAIN_ID_SYM
%token DELETE_SYM /* SQL-2003-R */
%token DENSE_RANK_SYM
%token DESC /* SQL-2003-N */
@@ -1908,6 +1909,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
vcol_opt_attribute_list vcol_attribute
opt_serial_attribute opt_serial_attribute_list serial_attribute
explainable_command
+ opt_delete_gtid_domain
END_OF_INPUT
%type <NONE> call sp_proc_stmts sp_proc_stmts1 sp_proc_stmt
@@ -13567,7 +13569,7 @@ flush_option:
{ Lex->type|= REFRESH_GENERAL_LOG; }
| SLOW LOGS_SYM
{ Lex->type|= REFRESH_SLOW_LOG; }
- | BINARY LOGS_SYM
+ | BINARY LOGS_SYM opt_delete_gtid_domain
{ Lex->type|= REFRESH_BINARY_LOG; }
| RELAY LOGS_SYM optional_connection_name
{
@@ -13624,6 +13626,24 @@ opt_table_list:
| table_list {}
;
+opt_delete_gtid_domain:
+ /* empty */ {}
+ | DELETE_DOMAIN_ID_SYM '=' '(' delete_domain_id_list ')'
+ {}
+ ;
+delete_domain_id_list:
+ /* Empty */
+ | delete_domain_id
+ | delete_domain_id_list ',' delete_domain_id
+ ;
+
+delete_domain_id:
+ ulong_num
+ {
+ insert_dynamic(&Lex->delete_gtid_domain, (uchar*) &($1));
+ }
+ ;
+
optional_flush_tables_arguments:
/* empty */ {$$= 0;}
| AND_SYM DISABLE_SYM CHECKPOINT_SYM {$$= REFRESH_CHECKPOINT; }
diff --git a/sql/sql_yacc_ora.yy b/sql/sql_yacc_ora.yy
index 9382ac60709..961624b2ae1 100644
--- a/sql/sql_yacc_ora.yy
+++ b/sql/sql_yacc_ora.yy
@@ -436,6 +436,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token DEFINER_SYM
%token DELAYED_SYM
%token DELAY_KEY_WRITE_SYM
+%token DELETE_DOMAIN_ID_SYM
%token DELETE_SYM /* SQL-2003-R */
%token DENSE_RANK_SYM
%token DESC /* SQL-2003-N */
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index e90f87fffc4..f6fdd97c6df 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -28,11 +28,19 @@
#endif
#ifdef HAVE_IOCP
-#define OPTIONAL_IO_POLL_READ_PARAM &overlapped
+#define OPTIONAL_IO_POLL_READ_PARAM this
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
#endif
+#ifdef _WIN32
+typedef HANDLE TP_file_handle;
+#else
+typedef int TP_file_handle;
+#define INVALID_HANDLE_VALUE -1
+#endif
+
+
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
@@ -59,10 +67,10 @@ typedef OVERLAPPED_ENTRY native_event;
#pragma warning (disable : 4312)
#endif
-static void io_poll_close(int fd)
+static void io_poll_close(TP_file_handle fd)
{
#ifdef _WIN32
- CloseHandle((HANDLE)fd);
+ CloseHandle(fd);
#else
close(fd);
#endif
@@ -151,14 +159,17 @@ struct TP_connection_generic:public TP_connection
TP_connection_generic **prev_in_queue;
ulonglong abs_wait_timeout;
ulonglong dequeue_time;
+ TP_file_handle fd;
bool bound_to_poll_descriptor;
int waiting;
#ifdef HAVE_IOCP
OVERLAPPED overlapped;
#endif
+#ifdef _WIN32
+ enum_vio_type vio_type;
+#endif
};
-typedef TP_connection_generic TP_connection_generic;
typedef I_P_List<TP_connection_generic,
I_P_List_adapter<TP_connection_generic,
@@ -177,7 +188,7 @@ struct thread_group_t
worker_list_t waiting_threads;
worker_thread_t *listener;
pthread_attr_t *pthread_attr;
- int pollfd;
+ TP_file_handle pollfd;
int thread_count;
int active_thread_count;
int connection_count;
@@ -245,11 +256,11 @@ static void print_pool_blocked_message(bool);
Creates an io_poll descriptor
On Linux: epoll_create()
- - io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt)
+ - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt)
Associate file descriptor with io poll descriptor
On Linux : epoll_ctl(..EPOLL_CTL_ADD))
- - io_poll_disassociate_fd(int pollfd, int fd)
+ - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
Associate file descriptor with io poll descriptor
On Linux: epoll_ctl(..EPOLL_CTL_DEL)
@@ -259,7 +270,7 @@ static void print_pool_blocked_message(bool);
io_poll_associate_fd() was called.
On Linux : epoll_ctl(..EPOLL_CTL_MOD)
- - io_poll_wait (int pollfd, native_event *native_events, int maxevents,
+ - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents,
int timeout_ms)
wait until one or more descriptors added with io_poll_associate_fd()
@@ -276,13 +287,13 @@ static void print_pool_blocked_message(bool);
/* Early 2.6 kernel did not have EPOLLRDHUP */
#define EPOLLRDHUP 0
#endif
-static int io_poll_create()
+static TP_file_handle io_poll_create()
{
return epoll_create(1);
}
-int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
+int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
@@ -293,7 +304,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
-int io_poll_start_read(int pollfd, int fd, void *data, void *)
+int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
@@ -302,7 +313,7 @@ int io_poll_start_read(int pollfd, int fd, void *data, void *)
return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
}
-int io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
struct epoll_event ev;
return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
@@ -314,7 +325,7 @@ int io_poll_disassociate_fd(int pollfd, int fd)
NOTE - in case of EINTR, it restarts with original timeout. Since we use
either infinite or 0 timeouts, this is not critical
*/
-int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
+int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents,
int timeout_ms)
{
int ret;
@@ -347,12 +358,12 @@ static void *native_event_get_userdata(native_event *event)
#endif
-int io_poll_create()
+TP_file_handle io_poll_create()
{
return kqueue();
}
-int io_poll_start_read(int pollfd, int fd, void *data,void *)
+int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
@@ -361,7 +372,7 @@ int io_poll_start_read(int pollfd, int fd, void *data,void *)
}
-int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
+int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
@@ -370,7 +381,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
}
-int io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
struct kevent ke;
MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
@@ -378,7 +389,7 @@ int io_poll_disassociate_fd(int pollfd, int fd)
}
-int io_poll_wait(int pollfd, struct kevent *events, int maxevents, int timeout_ms)
+int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms)
{
struct timespec ts;
int ret;
@@ -403,27 +414,27 @@ static void* native_event_get_userdata(native_event *event)
#elif defined (__sun)
-static int io_poll_create()
+static TP_file_handle io_poll_create()
{
return port_create();
}
-int io_poll_start_read(int pollfd, int fd, void *data, void *)
+int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
}
-static int io_poll_associate_fd(int pollfd, int fd, void *data, void *)
+static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
return io_poll_start_read(pollfd, fd, data, 0);
}
-int io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
}
-int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
+int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
{
struct timespec ts;
int ret;
@@ -451,25 +462,32 @@ static void* native_event_get_userdata(native_event *event)
#elif defined(HAVE_IOCP)
-static int io_poll_create()
+static TP_file_handle io_poll_create()
{
- HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
- return PtrToInt(h);
+ return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
}
-int io_poll_start_read(int pollfd, int fd, void *, void *opt)
+int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
{
- DWORD num_bytes = 0;
static char c;
+ TP_connection_generic *con= (TP_connection_generic *)opt;
+ OVERLAPPED *overlapped= &con->overlapped;
+ if (con->vio_type == VIO_TYPE_NAMEDPIPE)
+ {
+ if (ReadFile(fd, &c, 0, NULL, overlapped))
+ return 0;
+ }
+ else
+ {
+ WSABUF buf;
+ buf.buf= &c;
+ buf.len= 0;
+ DWORD flags=0;
- WSABUF buf;
- buf.buf= &c;
- buf.len= 0;
- DWORD flags=0;
-
- if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0)
- return 0;
+ if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
+ return 0;
+ }
if (GetLastError() == ERROR_IO_PENDING)
return 0;
@@ -478,26 +496,26 @@ int io_poll_start_read(int pollfd, int fd, void *, void *opt)
}
-static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt)
+static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt)
{
- HANDLE h= CreateIoCompletionPort(IntToPtr(fd), IntToPtr(pollfd), (ULONG_PTR)data, 0);
+ HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0);
if (!h)
return -1;
return io_poll_start_read(pollfd,fd, 0, opt);
}
-int io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
/* Not possible to unbind/rebind file descriptor in IOCP. */
return 0;
}
-int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
+int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
{
ULONG n;
- BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events,
+ BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
maxevents, &n, timeout_ms, FALSE);
return ok ? (int)n : -1;
@@ -1038,7 +1056,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
DBUG_ENTER("thread_group_init");
thread_group->pthread_attr = thread_attr;
mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
- thread_group->pollfd= -1;
+ thread_group->pollfd= INVALID_HANDLE_VALUE;
thread_group->shutdown_pipe[0]= -1;
thread_group->shutdown_pipe[1]= -1;
queue_init(thread_group);
@@ -1049,10 +1067,10 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
void thread_group_destroy(thread_group_t *thread_group)
{
mysql_mutex_destroy(&thread_group->mutex);
- if (thread_group->pollfd != -1)
+ if (thread_group->pollfd != INVALID_HANDLE_VALUE)
{
io_poll_close(thread_group->pollfd);
- thread_group->pollfd= -1;
+ thread_group->pollfd= INVALID_HANDLE_VALUE;
}
#ifndef HAVE_IOCP
for(int i=0; i < 2; i++)
@@ -1109,7 +1127,7 @@ static int wake_listener(thread_group_t *thread_group)
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
return -1;
#else
- PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0);
+ PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0);
#endif
return 0;
}
@@ -1432,6 +1450,16 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
, overlapped()
#endif
{
+ DBUG_ASSERT(c->vio);
+
+#ifdef _WIN32
+ vio_type= c->vio->type;
+ fd= (vio_type == VIO_TYPE_NAMEDPIPE) ?
+ c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket);
+#else
+ fd= mysql_socket_getfd(c->vio->mysql_socket);
+#endif
+
/* Assign connection to a group. */
thread_group_t *group=
&all_groups[c->thread_id%group_count];
@@ -1486,7 +1514,6 @@ static int change_group(TP_connection_generic *c,
thread_group_t *new_group)
{
int ret= 0;
- int fd= (int)mysql_socket_getfd(c->thd->net.vio->mysql_socket);
DBUG_ASSERT(c->thread_group == old_group);
@@ -1494,7 +1521,7 @@ static int change_group(TP_connection_generic *c,
mysql_mutex_lock(&old_group->mutex);
if (c->bound_to_poll_descriptor)
{
- io_poll_disassociate_fd(old_group->pollfd,fd);
+ io_poll_disassociate_fd(old_group->pollfd,c->fd);
c->bound_to_poll_descriptor= false;
}
c->thread_group->connection_count--;
@@ -1513,9 +1540,7 @@ static int change_group(TP_connection_generic *c,
int TP_connection_generic::start_io()
-{
- int fd= (int)mysql_socket_getfd(thd->net.vio->mysql_socket);
-
+{
#ifndef HAVE_IOCP
/*
Usually, connection will stay in the same group for the entire
@@ -1666,10 +1691,10 @@ int TP_pool_generic::set_pool_size(uint size)
{
thread_group_t *group= &all_groups[i];
mysql_mutex_lock(&group->mutex);
- if (group->pollfd == -1)
+ if (group->pollfd == INVALID_HANDLE_VALUE)
{
group->pollfd= io_poll_create();
- success= (group->pollfd >= 0);
+ success= (group->pollfd != INVALID_HANDLE_VALUE);
if(!success)
{
sql_print_error("io_poll_create() failed, errno=%d\n", errno);
@@ -1707,7 +1732,7 @@ int TP_pool_generic::set_stall_limit(uint limit)
int TP_pool_generic::get_idle_thread_count()
{
int sum=0;
- for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++)
+ for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
{
sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
}
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc
index 32544a826e3..42eb92244ff 100644
--- a/sql/wsrep_hton.cc
+++ b/sql/wsrep_hton.cc
@@ -508,6 +508,9 @@ wsrep_run_wsrep_commit(THD *thd, bool all)
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+
+ DEBUG_SYNC(thd, "wsrep_after_replication");
+
switch(rcode) {
case 0:
/*
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 44f10559fd7..7800ec5e627 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -2022,7 +2022,7 @@ static bool abort_replicated(THD *thd)
bool ret_code= false;
if (thd->wsrep_query_state== QUERY_COMMITTING)
{
- WSREP_DEBUG("aborting replicated trx: %lu", thd->real_id);
+ WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id));
(void)wsrep_abort_thd(thd, thd, TRUE);
ret_code= true;
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index 8107ab12c6b..188fa3e292b 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -327,8 +327,9 @@ bool wsrep_provider_update (sys_var *self, THD* thd, enum_var_type type)
if (wsrep_inited == 1)
wsrep_deinit(false);
- char* tmp= strdup(wsrep_provider); // wsrep_init() rewrites provider
+ char* tmp= strdup(wsrep_provider); // wsrep_init() rewrites provider
//when fails
+
if (wsrep_init())
{
my_error(ER_CANT_OPEN_LIBRARY, MYF(0), tmp, my_error, "wsrep_init failed");