summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSergei Golubchik <serg@mariadb.org>2018-01-17 00:45:02 +0100
committerSergei Golubchik <serg@mariadb.org>2018-01-17 00:45:02 +0100
commit8f102b584d4e8f02da924f9be094014136eb453f (patch)
tree80e98303b2960d477e1e2223b0a986f5fd6edf0a /sql
parent715a507e3368451b824f211dea34a55c5d4dac1d (diff)
parentd87531a6a053fdf8bc828857d9cdc11a97026ad6 (diff)
downloadmariadb-git-8f102b584d4e8f02da924f9be094014136eb453f.tar.gz
Merge branch 'github/10.3' into bb-10.3-temporalmariadb-10.3.4
Diffstat (limited to 'sql')
-rw-r--r--sql/field.cc2
-rw-r--r--sql/item_func.cc2
-rw-r--r--sql/item_sum.cc4
-rw-r--r--sql/key.cc20
-rw-r--r--sql/log.cc2
-rw-r--r--sql/log_event.cc6
-rw-r--r--sql/mdl.h5
-rw-r--r--sql/mysqld.cc7
-rw-r--r--sql/opt_subselect.cc64
-rw-r--r--sql/rpl_gtid.cc2
-rw-r--r--sql/rpl_mi.h21
-rw-r--r--sql/set_var.cc2
-rw-r--r--sql/slave.cc200
-rw-r--r--sql/sql_class.h2
-rw-r--r--sql/sql_cte.cc7
-rw-r--r--sql/sql_parse.cc22
-rw-r--r--sql/sql_prepare.cc11
-rw-r--r--sql/sql_select.cc6
-rw-r--r--sql/sql_select.h4
-rw-r--r--sql/sql_union.cc1
20 files changed, 304 insertions, 86 deletions
diff --git a/sql/field.cc b/sql/field.cc
index 642ad6a65e9..8de823cee7c 100644
--- a/sql/field.cc
+++ b/sql/field.cc
@@ -8533,6 +8533,7 @@ uchar *Field_blob::pack(uchar *to, const uchar *from, uint max_length)
@return New pointer into memory based on from + length of the data
*/
+
const uchar *Field_blob::unpack(uchar *to, const uchar *from,
const uchar *from_end, uint param_data)
{
@@ -8548,7 +8549,6 @@ const uchar *Field_blob::unpack(uchar *to, const uchar *from,
if (from + master_packlength + length > from_end)
DBUG_RETURN(0);
set_ptr(length, const_cast<uchar*> (from) + master_packlength);
- DBUG_DUMP("record", to, table->s->reclength);
DBUG_RETURN(from + master_packlength + length);
}
diff --git a/sql/item_func.cc b/sql/item_func.cc
index ac7ed75e7f9..93ddddfde61 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -2724,7 +2724,7 @@ bool Item_func_min_max::get_date_native(MYSQL_TIME *ltime, ulonglong fuzzy_date)
ltime->hour+= (ltime->month * 32 + ltime->day) * 24;
ltime->year= ltime->month= ltime->day= 0;
if (adjust_time_range_with_warn(ltime,
- std::min<uint>(decimals, TIME_SECOND_PART_DIGITS)))
+ MY_MIN(decimals, TIME_SECOND_PART_DIGITS)))
return (null_value= true);
}
diff --git a/sql/item_sum.cc b/sql/item_sum.cc
index 945224c2623..be9aefa7338 100644
--- a/sql/item_sum.cc
+++ b/sql/item_sum.cc
@@ -2531,7 +2531,7 @@ bool Item_sum_bit::remove_as_window(ulonglong value)
}
// Prevent overflow;
- num_values_added = std::min(num_values_added, num_values_added - 1);
+ num_values_added = MY_MIN(num_values_added, num_values_added - 1);
set_bits_from_counters();
return 0;
}
@@ -2544,7 +2544,7 @@ bool Item_sum_bit::add_as_window(ulonglong value)
bit_counters[i]+= (value & (1ULL << i)) ? 1 : 0;
}
// Prevent overflow;
- num_values_added = std::max(num_values_added, num_values_added + 1);
+ num_values_added = MY_MAX(num_values_added, num_values_added + 1);
set_bits_from_counters();
return 0;
}
diff --git a/sql/key.cc b/sql/key.cc
index 3ee083e560f..ab93e8a0437 100644
--- a/sql/key.cc
+++ b/sql/key.cc
@@ -1,4 +1,5 @@
/* Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2018, MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -21,9 +22,6 @@
#include "key.h" // key_rec_cmp
#include "field.h" // Field
-using std::min;
-using std::max;
-
/*
Search after a key that starts with 'field'
@@ -135,7 +133,7 @@ void key_copy(uchar *to_key, const uchar *from_record, KEY *key_info,
Don't copy data for null values
The -1 below is to subtract the null byte which is already handled
*/
- length= min<uint>(key_length, key_part->store_length-1);
+ length= MY_MIN(key_length, uint(key_part->store_length)-1);
if (with_zerofill)
bzero((char*) to_key, length);
continue;
@@ -145,7 +143,7 @@ void key_copy(uchar *to_key, const uchar *from_record, KEY *key_info,
key_part->key_part_flag & HA_VAR_LENGTH_PART)
{
key_length-= HA_KEY_BLOB_LENGTH;
- length= min<uint>(key_length, key_part->length);
+ length= MY_MIN(key_length, key_part->length);
uint bytes= key_part->field->get_key_image(to_key, length, Field::itRAW);
if (with_zerofill && bytes < length)
bzero((char*) to_key + bytes, length - bytes);
@@ -153,7 +151,7 @@ void key_copy(uchar *to_key, const uchar *from_record, KEY *key_info,
}
else
{
- length= min<uint>(key_length, key_part->length);
+ length= MY_MIN(key_length, key_part->length);
Field *field= key_part->field;
CHARSET_INFO *cs= field->charset();
uint bytes= field->get_key_image(to_key, length, Field::itRAW);
@@ -205,7 +203,7 @@ void key_restore(uchar *to_record, const uchar *from_key, KEY *key_info,
Don't copy data for null bytes
The -1 below is to subtract the null byte which is already handled
*/
- length= min<uint>(key_length, key_part->store_length-1);
+ length= MY_MIN(key_length, uint(key_part->store_length)-1);
continue;
}
}
@@ -247,7 +245,7 @@ void key_restore(uchar *to_record, const uchar *from_key, KEY *key_info,
my_ptrdiff_t ptrdiff= to_record - field->table->record[0];
field->move_field_offset(ptrdiff);
key_length-= HA_KEY_BLOB_LENGTH;
- length= min<uint>(key_length, key_part->length);
+ length= MY_MIN(key_length, key_part->length);
old_map= dbug_tmp_use_all_columns(field->table, field->table->write_set);
field->set_key_image(from_key, length);
dbug_tmp_restore_column_map(field->table->write_set, old_map);
@@ -256,7 +254,7 @@ void key_restore(uchar *to_record, const uchar *from_key, KEY *key_info,
}
else
{
- length= min<uint>(key_length, key_part->length);
+ length= MY_MIN(key_length, key_part->length);
/* skip the byte with 'uneven' bits, if used */
memcpy(to_record + key_part->offset, from_key + used_uneven_bits
, (size_t) length - used_uneven_bits);
@@ -314,7 +312,7 @@ bool key_cmp_if_same(TABLE *table,const uchar *key,uint idx,uint key_length)
return 1;
continue;
}
- length= min((uint) (key_end-key), store_length);
+ length= MY_MIN((uint) (key_end-key), store_length);
if (!(key_part->key_type & (FIELDFLAG_NUMBER+FIELDFLAG_BINARY+
FIELDFLAG_PACK)))
{
@@ -392,7 +390,7 @@ void field_unpack(String *to, Field *field, const uchar *rec, uint max_length,
tmp.length(charpos);
}
if (max_length < field->pack_length())
- tmp.length(min(tmp.length(),max_length));
+ tmp.length(MY_MIN(tmp.length(),max_length));
ErrConvString err(&tmp);
to->append(err.ptr());
}
diff --git a/sql/log.cc b/sql/log.cc
index 6923a6241cd..742b0a03e90 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -7108,7 +7108,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len);
writer.remains= ev_len;
- if (writer.write(ev, std::min<uint>(ev_len, length - hdr_offs)))
+ if (writer.write(ev, MY_MIN(ev_len, length - hdr_offs)))
DBUG_RETURN(ER_ERROR_ON_WRITE);
/* next event header at ... */
diff --git a/sql/log_event.cc b/sql/log_event.cc
index ca659085228..c40e3be94c8 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -56,8 +56,6 @@
#define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1))
-using std::max;
-
/**
BINLOG_CHECKSUM variable.
*/
@@ -1799,8 +1797,8 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN)
DBUG_RETURN(LOG_READ_BOGUS);
- if (data_len > max(max_allowed_packet,
- opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
+ if (data_len > MY_MAX(max_allowed_packet,
+ opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
DBUG_RETURN(LOG_READ_TOO_LARGE);
if (likely(data_len > LOG_EVENT_MINIMAL_HEADER_LEN))
diff --git a/sql/mdl.h b/sql/mdl.h
index f30c976ac94..be9cc806ec2 100644
--- a/sql/mdl.h
+++ b/sql/mdl.h
@@ -21,8 +21,6 @@
#include <mysql_com.h>
#include <lf.h>
-#include <algorithm>
-
class THD;
class MDL_context;
@@ -373,8 +371,7 @@ public:
character set is utf-8, we can safely assume that no
character starts with a zero byte.
*/
- using std::min;
- return memcmp(m_ptr, rhs->m_ptr, min(m_length, rhs->m_length));
+ return memcmp(m_ptr, rhs->m_ptr, MY_MIN(m_length, rhs->m_length));
}
MDL_key(const MDL_key *rhs)
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 067b2e70ba6..29b12934550 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -2928,16 +2928,17 @@ void unlink_thd(THD *thd)
DBUG_ENTER("unlink_thd");
DBUG_PRINT("enter", ("thd: %p", thd));
+ thd->cleanup();
+ thd->add_status_to_global();
+ unlink_not_visible_thd(thd);
+
/*
Do not decrement when its wsrep system thread. wsrep_applier is set for
applier as well as rollbacker threads.
*/
if (IF_WSREP(!thd->wsrep_applier, 1))
dec_connection_count(thd->scheduler);
- thd->cleanup();
- thd->add_status_to_global();
- unlink_not_visible_thd(thd);
thd->free_connection();
DBUG_VOID_RETURN;
diff --git a/sql/opt_subselect.cc b/sql/opt_subselect.cc
index dada23508b5..9d6b67e845e 100644
--- a/sql/opt_subselect.cc
+++ b/sql/opt_subselect.cc
@@ -2662,7 +2662,8 @@ void advance_sj_state(JOIN *join, table_map remaining_tables, uint idx,
LooseScan detector in best_access_path)
*/
remaining_tables &= ~new_join_tab->table->map;
- table_map dups_producing_tables;
+ table_map dups_producing_tables, prev_dups_producing_tables,
+ prev_sjm_lookup_tables;
if (idx == join->const_tables)
dups_producing_tables= 0;
@@ -2673,7 +2674,7 @@ void advance_sj_state(JOIN *join, table_map remaining_tables, uint idx,
if ((emb_sj_nest= new_join_tab->emb_sj_nest))
dups_producing_tables |= emb_sj_nest->sj_inner_tables;
- Semi_join_strategy_picker **strategy;
+ Semi_join_strategy_picker **strategy, **prev_strategy;
if (idx == join->const_tables)
{
/* First table, initialize pickers */
@@ -2725,23 +2726,54 @@ void advance_sj_state(JOIN *join, table_map remaining_tables, uint idx,
3. We have no clue what to do about fanount of semi-join Y.
*/
if ((dups_producing_tables & handled_fanout) ||
- (read_time < *current_read_time &&
+ (read_time < *current_read_time &&
!(handled_fanout & pos->inner_tables_handled_with_other_sjs)))
{
- /* Mark strategy as used */
- (*strategy)->mark_used();
- pos->sj_strategy= sj_strategy;
- if (sj_strategy == SJ_OPT_MATERIALIZE)
- join->sjm_lookup_tables |= handled_fanout;
+ DBUG_ASSERT(pos->sj_strategy != sj_strategy);
+ /*
+ If the strategy choosen first time or
+ the strategy replace strategy which was used to exectly the same
+ tables
+ */
+ if (pos->sj_strategy == SJ_OPT_NONE ||
+ handled_fanout ==
+ (prev_dups_producing_tables ^ dups_producing_tables))
+ {
+ prev_strategy= strategy;
+ if (pos->sj_strategy == SJ_OPT_NONE)
+ {
+ prev_dups_producing_tables= dups_producing_tables;
+ prev_sjm_lookup_tables= join->sjm_lookup_tables;
+ }
+ /* Mark strategy as used */
+ (*strategy)->mark_used();
+ pos->sj_strategy= sj_strategy;
+ if (sj_strategy == SJ_OPT_MATERIALIZE)
+ join->sjm_lookup_tables |= handled_fanout;
+ else
+ join->sjm_lookup_tables &= ~handled_fanout;
+ *current_read_time= read_time;
+ *current_record_count= rec_count;
+ dups_producing_tables &= ~handled_fanout;
+ //TODO: update bitmap of semi-joins that were handled together with
+ // others.
+ if (is_multiple_semi_joins(join, join->positions, idx,
+ handled_fanout))
+ pos->inner_tables_handled_with_other_sjs |= handled_fanout;
+ }
else
- join->sjm_lookup_tables &= ~handled_fanout;
- *current_read_time= read_time;
- *current_record_count= rec_count;
- dups_producing_tables &= ~handled_fanout;
- //TODO: update bitmap of semi-joins that were handled together with
- // others.
- if (is_multiple_semi_joins(join, join->positions, idx, handled_fanout))
- pos->inner_tables_handled_with_other_sjs |= handled_fanout;
+ {
+ /* Conflict fall to most general variant */
+ (*prev_strategy)->set_empty();
+ dups_producing_tables= prev_dups_producing_tables;
+ join->sjm_lookup_tables= prev_sjm_lookup_tables;
+ // mark it 'none' to avpoid loops
+ pos->sj_strategy= SJ_OPT_NONE;
+ // next skip to last;
+ strategy= pickers +
+ (sizeof(pickers)/sizeof(Semi_join_strategy_picker*) - 3);
+ continue;
+ }
}
else
{
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 9f48f908102..78e6165a551 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -582,7 +582,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool table_opened= false;
TABLE *table;
list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr;
- uint64_t best_sub_id;
+ uint64 best_sub_id;
element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 14d74dc4bb7..260c35e8c04 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -133,6 +133,19 @@ public:
extern TYPELIB slave_parallel_mode_typelib;
+typedef struct st_rows_event_tracker
+{
+ char binlog_file_name[FN_REFLEN];
+ my_off_t first_seen;
+ my_off_t last_seen;
+ bool stmt_end_seen;
+ void update(const char* file_name, my_off_t pos,
+ const char* buf,
+ const Format_description_log_event *fdle);
+ void reset();
+ bool check_and_report(const char* file_name, my_off_t pos);
+} Rows_event_tracker;
+
/*****************************************************************************
Replication IO Thread
@@ -301,6 +314,14 @@ class Master_info : public Slave_reporting_capability
uint64 gtid_reconnect_event_skip_count;
/* gtid_event_seen is false until we receive first GTID event from master. */
bool gtid_event_seen;
+ /**
+ The struct holds some history of Rows- log-event reading/queuing
+ by the receiver thread. Its fields are updated per each such event
+ at time of queue_event(), and they are checked to detect
+ the Rows- event group integrity violation at time of first non-Rows-
+ event gets handled.
+ */
+ Rows_event_tracker rows_event_tracker;
bool in_start_all_slaves, in_stop_all_slaves;
bool in_flush_all_relay_logs;
uint users; /* Active user for object */
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 311b33bc0dd..097124a0b3a 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -1323,7 +1323,7 @@ resolve_engine_list_item(THD *thd, plugin_ref *list, uint32 *idx,
{
LEX_CSTRING item_str;
plugin_ref ref;
- uint32_t i;
+ uint32 i;
THD *thd_or_null = (temp_copy ? thd : NULL);
item_str.str= pos;
diff --git a/sql/slave.cc b/sql/slave.cc
index 90ad8bbdef1..b69e92a8363 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3659,7 +3659,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
we suppress prints to .err file as long as the reconnect
happens without problems
*/
- *suppress_warnings= TRUE;
+ *suppress_warnings=
+ global_system_variables.log_warnings < 2 ? TRUE : FALSE;
}
else
{
@@ -4589,6 +4590,7 @@ pthread_handler_t handle_slave_io(void *arg)
mi->abort_slave = 0;
mysql_mutex_unlock(&mi->run_lock);
mysql_cond_broadcast(&mi->start_cond);
+ mi->rows_event_tracker.reset();
DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu",
mi->master_log_name, mi->master_log_pos));
@@ -4672,6 +4674,10 @@ connected:
*/
mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid;
mi->gtid_event_seen= false;
+ /*
+ Reset stale state of the rows-event group tracker at reconnect.
+ */
+ mi->rows_event_tracker.reset();
}
#ifdef ENABLED_DEBUG_SYNC
@@ -6077,7 +6083,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
char* new_buf = NULL;
char new_buf_arr[4096];
bool is_malloc = false;
-
+ bool is_rows_event= false;
/*
FD_q must have been prepared for the first R_a event
inside get_master_version_and_clock()
@@ -6511,11 +6517,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
got_gtid_event= true;
if (mi->using_gtid == Master_info::USE_GTID_NO)
goto default_action;
- if (unlikely(!mi->gtid_event_seen))
+ if (unlikely(mi->gtid_reconnect_event_skip_count))
{
- mi->gtid_event_seen= true;
- if (mi->gtid_reconnect_event_skip_count)
+ if (likely(!mi->gtid_event_seen))
{
+ mi->gtid_event_seen= true;
/*
If we are reconnecting, and we need to skip a partial event group
already queued to the relay log before the reconnect, then we check
@@ -6544,13 +6550,45 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first);
goto err;
}
+ if (global_system_variables.log_warnings > 1)
+ {
+ bool first= true;
+ StringBuffer<1024> gtid_text;
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ sql_print_information("Slave IO thread is reconnected to "
+ "receive Gtid_log_event %s. It is to skip %llu "
+ "already received events including the gtid one",
+ gtid_text.ptr(),
+ mi->events_queued_since_last_gtid);
+ }
+ goto default_action;
}
- }
+ else
+ {
+ bool first;
+ StringBuffer<1024> gtid_text;
- if (unlikely(mi->gtid_reconnect_event_skip_count))
- {
- goto default_action;
+ gtid_text.append(STRING_WITH_LEN("Last received gtid: "));
+ first= true;
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ gtid_text.append(STRING_WITH_LEN(", currently received: "));
+ first= true;
+ rpl_slave_state_tostring_helper(&gtid_text, &event_gtid, &first);
+
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ sql_print_error("Slave IO thread has received a new Gtid_log_event "
+ "while skipping already logged events "
+ "after reconnect. %s. %llu remains to be skipped. "
+ "The number of originally read events was %llu",
+ gtid_text.ptr(),
+ mi->gtid_reconnect_event_skip_count,
+ mi->events_queued_since_last_gtid);
+ goto err;
+ }
}
+ mi->gtid_event_seen= true;
/*
We have successfully queued to relay log everything before this GTID, so
@@ -6617,8 +6655,34 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
}
}
- buf = new_buf;
is_compress_event = true;
+ buf = new_buf;
+ /*
+ As we are uncertain about compressed V2 rows events, we don't track
+ them
+ */
+ if (LOG_EVENT_IS_ROW_V2((Log_event_type) buf[EVENT_TYPE_OFFSET]))
+ goto default_action;
+ /* fall through */
+ case WRITE_ROWS_EVENT_V1:
+ case UPDATE_ROWS_EVENT_V1:
+ case DELETE_ROWS_EVENT_V1:
+ case WRITE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case DELETE_ROWS_EVENT:
+ {
+ is_rows_event= true;
+ mi->rows_event_tracker.update(mi->master_log_name,
+ mi->master_log_pos,
+ buf,
+ mi->rli.relay_log.
+ description_event_for_queue);
+
+ DBUG_EXECUTE_IF("simulate_stmt_end_rows_event_loss",
+ {
+ mi->rows_event_tracker.stmt_end_seen= false;
+ });
+ }
goto default_action;
#ifndef DBUG_OFF
@@ -6677,6 +6741,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
/*
+ Integrity of Rows- event group check.
+ A sequence of Rows- events must end with STMT_END_F flagged one.
+ Even when Heartbeat event interrupts Rows- events flow this must indicate a
+ malfunction e.g logging on the master.
+ */
+ if (((uchar) buf[EVENT_TYPE_OFFSET] != HEARTBEAT_LOG_EVENT) &&
+ !is_rows_event &&
+ mi->rows_event_tracker.check_and_report(mi->master_log_name,
+ mi->master_log_pos))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+
+ /*
If we filter events master-side (eg. @@skip_replication), we will see holes
in the event positions from the master. If we see such a hole, adjust
mi->master_log_pos accordingly so we maintain the correct position (for
@@ -6845,6 +6924,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
The whole of the current event group is queued. So in case of
reconnect we can start from after the current GTID.
*/
+ if (mi->gtid_reconnect_event_skip_count)
+ {
+ bool first= true;
+ StringBuffer<1024> gtid_text;
+
+ rpl_slave_state_tostring_helper(&gtid_text, &mi->last_queued_gtid,
+ &first);
+ sql_print_error("Slave IO thread received a terminal event from "
+ "group %s whose retrieval was interrupted "
+ "with reconnect. We still had %llu events to read. "
+ "The number of originally read events was %llu",
+ gtid_text.ptr(),
+ mi->gtid_reconnect_event_skip_count,
+ mi->events_queued_since_last_gtid);
+ }
mi->gtid_current_pos.update(&mi->last_queued_gtid);
mi->events_queued_since_last_gtid= 0;
@@ -7844,6 +7938,92 @@ bool rpl_master_erroneous_autoinc(THD *thd)
return FALSE;
}
+
+static bool get_row_event_stmt_end(const char* buf,
+ const Format_description_log_event *fdle)
+{
+ uint8 const common_header_len= fdle->common_header_len;
+ Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
+
+ uint8 const post_header_len= fdle->post_header_len[event_type-1];
+ const char *flag_start= buf + common_header_len;
+ /*
+ The term 4 below signifies that master is of 'an intermediate source', see
+ Rows_log_event::Rows_log_event.
+ */
+ flag_start += RW_MAPID_OFFSET + (post_header_len == 6) ? 4 : RW_FLAGS_OFFSET;
+
+ return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0;
+}
+
+
+/*
+ Reset log event tracking data.
+*/
+
+void Rows_event_tracker::reset()
+{
+ binlog_file_name[0]= 0;
+ first_seen= last_seen= 0;
+ stmt_end_seen= false;
+}
+
+
+/*
+ Update log event tracking data.
+
+ The first- and last- seen event binlog position get memorized, as
+ well as the end-of-statement status of the last one.
+*/
+
+void Rows_event_tracker::update(const char* file_name, my_off_t pos,
+ const char* buf,
+ const Format_description_log_event *fdle)
+{
+ if (!first_seen)
+ {
+ first_seen= pos;
+ strmake(binlog_file_name, file_name, sizeof(binlog_file_name) - 1);
+ }
+ last_seen= pos;
+ DBUG_ASSERT(stmt_end_seen == 0); // We can only have one
+ stmt_end_seen= get_row_event_stmt_end(buf, fdle);
+};
+
+
+/**
+ The function is called at next event reading
+ after a sequence of Rows- log-events. It checks the end-of-statement status
+ of the past sequence to report on any isssue.
+ In the positive case the tracker gets reset.
+
+ @return true when the Rows- event group integrity found compromised,
+ false otherwise.
+*/
+bool Rows_event_tracker::check_and_report(const char* file_name,
+ my_off_t pos)
+{
+ if (last_seen)
+ {
+ // there was at least one "block" event previously
+ if (!stmt_end_seen)
+ {
+ sql_print_error("Slave IO thread did not receive an expected "
+ "Rows-log end-of-statement for event starting "
+ "at log '%s' position %llu "
+ "whose last block was seen at log '%s' position %llu. "
+ "The end-of-statement should have been delivered "
+ "before the current one at log '%s' position %llu",
+ binlog_file_name, first_seen,
+ binlog_file_name, last_seen, file_name, pos);
+ return true;
+ }
+ reset();
+ }
+
+ return false;
+}
+
/**
@} (end of group Replication)
*/
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 5469171f0e3..28c603dd6b3 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -837,7 +837,7 @@ typedef struct system_status_var
ulonglong table_open_cache_overflows;
double last_query_cost;
double cpu_time, busy_time;
- uint32_t threads_running;
+ uint32 threads_running;
/* Don't initialize */
/* Memory used for thread local storage */
int64 max_local_memory_used;
diff --git a/sql/sql_cte.cc b/sql/sql_cte.cc
index 9608436226c..7da088cdfd5 100644
--- a/sql/sql_cte.cc
+++ b/sql/sql_cte.cc
@@ -833,12 +833,19 @@ st_select_lex_unit *With_element::clone_parsed_spec(THD *thd,
parse_status= parse_sql(thd, &parser_state, 0);
if (parse_status)
goto err;
+
+ if (check_dependencies_in_with_clauses(lex->with_clauses_list))
+ goto err;
+
spec_tables= lex->query_tables;
spec_tables_tail= 0;
for (TABLE_LIST *tbl= spec_tables;
tbl;
tbl= tbl->next_global)
{
+ if (!tbl->derived && !tbl->schema_table &&
+ thd->open_temporary_table(tbl))
+ goto err;
spec_tables_tail= tbl;
}
if (check_table_access(thd, SELECT_ACL, spec_tables, FALSE, UINT_MAX, FALSE))
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index b058af531c6..44c8236023c 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -3274,6 +3274,9 @@ mysql_execute_command(THD *thd)
thd->get_stmt_da()->opt_clear_warning_info(thd->query_id);
}
+ if (check_dependencies_in_with_clauses(thd->lex->with_clauses_list))
+ DBUG_RETURN(1);
+
#ifdef HAVE_REPLICATION
if (unlikely(thd->slave_thread))
{
@@ -3731,14 +3734,6 @@ mysql_execute_command(THD *thd)
ulong privileges_requested= lex->exchange ? SELECT_ACL | FILE_ACL :
SELECT_ACL;
- /*
- The same function must be called for DML commands
- when CTEs are supported in DML statements
- */
- res= check_dependencies_in_with_clauses(thd->lex->with_clauses_list);
- if (res)
- break;
-
if (all_tables)
res= check_table_access(thd,
privileges_requested,
@@ -4165,8 +4160,7 @@ mysql_execute_command(THD *thd)
/* Copy temporarily the statement flags to thd for lock_table_names() */
uint save_thd_create_info_options= thd->lex->create_info.options;
thd->lex->create_info.options|= create_info.options;
- if (!(res= check_dependencies_in_with_clauses(lex->with_clauses_list)))
- res= open_and_lock_tables(thd, create_info, lex->query_tables, TRUE, 0);
+ res= open_and_lock_tables(thd, create_info, lex->query_tables, TRUE, 0);
thd->lex->create_info.options= save_thd_create_info_options;
if (res)
{
@@ -4783,8 +4777,7 @@ end_with_restore_list:
unit->set_limit(select_lex);
- if (!(res= check_dependencies_in_with_clauses(lex->with_clauses_list)) &&
- !(res=open_and_lock_tables(thd, all_tables, TRUE, 0)))
+ if (!(res=open_and_lock_tables(thd, all_tables, TRUE, 0)))
{
MYSQL_INSERT_SELECT_START(thd->query());
/*
@@ -5115,9 +5108,6 @@ end_with_restore_list:
{
List<set_var_base> *lex_var_list= &lex->var_list;
- if (check_dependencies_in_with_clauses(thd->lex->with_clauses_list))
- goto error;
-
if ((check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE)
|| open_and_lock_tables(thd, all_tables, TRUE, 0)))
goto error;
@@ -6436,8 +6426,6 @@ static bool execute_sqlcom_select(THD *thd, TABLE_LIST *all_tables)
new (thd->mem_root) Item_int(thd,
(ulonglong) thd->variables.select_limit);
}
- if (check_dependencies_in_with_clauses(lex->with_clauses_list))
- return 1;
if (thd->variables.vers_alter_history == VERS_ALTER_HISTORY_SURVIVE)
{
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index b31c2f36db1..80ac2b22a86 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -1516,8 +1516,6 @@ static int mysql_test_select(Prepared_statement *stmt,
lex->select_lex.context.resolve_in_select_list= TRUE;
ulong privilege= lex->exchange ? SELECT_ACL | FILE_ACL : SELECT_ACL;
- if (check_dependencies_in_with_clauses(lex->with_clauses_list))
- goto error;
if (tables)
{
if (check_table_access(thd, privilege, tables, FALSE, UINT_MAX, FALSE))
@@ -1784,9 +1782,6 @@ static bool mysql_test_create_table(Prepared_statement *stmt)
if (create_table_precheck(thd, tables, create_table))
DBUG_RETURN(TRUE);
- if (check_dependencies_in_with_clauses(lex->with_clauses_list))
- DBUG_RETURN(TRUE);
-
if (select_lex->item_list.elements)
{
/* Base table and temporary table are not in the same name space. */
@@ -2177,9 +2172,6 @@ static bool mysql_test_insert_select(Prepared_statement *stmt,
if (insert_precheck(stmt->thd, tables))
return 1;
- if (check_dependencies_in_with_clauses(lex->with_clauses_list))
- return 1;
-
/* store it, because mysql_insert_select_prepare_tester change it */
first_local_table= lex->select_lex.table_list.first;
DBUG_ASSERT(first_local_table != 0);
@@ -2282,6 +2274,9 @@ static bool check_prepared_statement(Prepared_statement *stmt)
if (tables)
thd->get_stmt_da()->opt_clear_warning_info(thd->query_id);
+ if (check_dependencies_in_with_clauses(thd->lex->with_clauses_list))
+ goto error;
+
if (sql_command_flags[sql_command] & CF_HA_CLOSE)
mysql_ha_rm_tables(thd, tables);
diff --git a/sql/sql_select.cc b/sql/sql_select.cc
index 0527eef4fde..d6fe06173a2 100644
--- a/sql/sql_select.cc
+++ b/sql/sql_select.cc
@@ -18358,7 +18358,7 @@ bool create_internal_tmp_table(TABLE *table, KEY *keyinfo,
table->in_use->inc_status_created_tmp_disk_tables();
table->in_use->inc_status_created_tmp_tables();
share->db_record_offset= 1;
- table->created= TRUE;
+ table->set_created();
DBUG_RETURN(0);
err:
DBUG_RETURN(1);
@@ -18869,8 +18869,8 @@ int rr_sequential_and_unpack(READ_RECORD *info)
*/
bool instantiate_tmp_table(TABLE *table, KEY *keyinfo,
- MARIA_COLUMNDEF *start_recinfo,
- MARIA_COLUMNDEF **recinfo,
+ TMP_ENGINE_COLUMNDEF *start_recinfo,
+ TMP_ENGINE_COLUMNDEF **recinfo,
ulonglong options)
{
if (table->s->db_type() == TMP_ENGINE_HTON)
diff --git a/sql/sql_select.h b/sql/sql_select.h
index fbadca2255d..298097f1b98 100644
--- a/sql/sql_select.h
+++ b/sql/sql_select.h
@@ -2348,8 +2348,8 @@ bool create_internal_tmp_table(TABLE *table, KEY *keyinfo,
TMP_ENGINE_COLUMNDEF **recinfo,
ulonglong options);
bool instantiate_tmp_table(TABLE *table, KEY *keyinfo,
- MARIA_COLUMNDEF *start_recinfo,
- MARIA_COLUMNDEF **recinfo,
+ TMP_ENGINE_COLUMNDEF *start_recinfo,
+ TMP_ENGINE_COLUMNDEF **recinfo,
ulonglong options);
bool open_tmp_table(TABLE *table);
void setup_tmp_table_column_bitmaps(TABLE *table, uchar *bitmaps);
diff --git a/sql/sql_union.cc b/sql/sql_union.cc
index dda57cc5b4d..f18d30a45c0 100644
--- a/sql/sql_union.cc
+++ b/sql/sql_union.cc
@@ -1710,6 +1710,7 @@ bool st_select_lex_unit::exec_recursive()
sq;
sq= sq->next_with_rec_ref)
{
+ sq->reset();
sq->engine->force_reexecution();
}