diff options
author | Sergei Golubchik <serg@mariadb.org> | 2018-01-17 00:45:02 +0100 |
---|---|---|
committer | Sergei Golubchik <serg@mariadb.org> | 2018-01-17 00:45:02 +0100 |
commit | 8f102b584d4e8f02da924f9be094014136eb453f (patch) | |
tree | 80e98303b2960d477e1e2223b0a986f5fd6edf0a /sql | |
parent | 715a507e3368451b824f211dea34a55c5d4dac1d (diff) | |
parent | d87531a6a053fdf8bc828857d9cdc11a97026ad6 (diff) | |
download | mariadb-git-8f102b584d4e8f02da924f9be094014136eb453f.tar.gz |
Merge branch 'github/10.3' into bb-10.3-temporalmariadb-10.3.4
Diffstat (limited to 'sql')
-rw-r--r-- | sql/field.cc | 2 | ||||
-rw-r--r-- | sql/item_func.cc | 2 | ||||
-rw-r--r-- | sql/item_sum.cc | 4 | ||||
-rw-r--r-- | sql/key.cc | 20 | ||||
-rw-r--r-- | sql/log.cc | 2 | ||||
-rw-r--r-- | sql/log_event.cc | 6 | ||||
-rw-r--r-- | sql/mdl.h | 5 | ||||
-rw-r--r-- | sql/mysqld.cc | 7 | ||||
-rw-r--r-- | sql/opt_subselect.cc | 64 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 2 | ||||
-rw-r--r-- | sql/rpl_mi.h | 21 | ||||
-rw-r--r-- | sql/set_var.cc | 2 | ||||
-rw-r--r-- | sql/slave.cc | 200 | ||||
-rw-r--r-- | sql/sql_class.h | 2 | ||||
-rw-r--r-- | sql/sql_cte.cc | 7 | ||||
-rw-r--r-- | sql/sql_parse.cc | 22 | ||||
-rw-r--r-- | sql/sql_prepare.cc | 11 | ||||
-rw-r--r-- | sql/sql_select.cc | 6 | ||||
-rw-r--r-- | sql/sql_select.h | 4 | ||||
-rw-r--r-- | sql/sql_union.cc | 1 |
20 files changed, 304 insertions, 86 deletions
diff --git a/sql/field.cc b/sql/field.cc index 642ad6a65e9..8de823cee7c 100644 --- a/sql/field.cc +++ b/sql/field.cc @@ -8533,6 +8533,7 @@ uchar *Field_blob::pack(uchar *to, const uchar *from, uint max_length) @return New pointer into memory based on from + length of the data */ + const uchar *Field_blob::unpack(uchar *to, const uchar *from, const uchar *from_end, uint param_data) { @@ -8548,7 +8549,6 @@ const uchar *Field_blob::unpack(uchar *to, const uchar *from, if (from + master_packlength + length > from_end) DBUG_RETURN(0); set_ptr(length, const_cast<uchar*> (from) + master_packlength); - DBUG_DUMP("record", to, table->s->reclength); DBUG_RETURN(from + master_packlength + length); } diff --git a/sql/item_func.cc b/sql/item_func.cc index ac7ed75e7f9..93ddddfde61 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -2724,7 +2724,7 @@ bool Item_func_min_max::get_date_native(MYSQL_TIME *ltime, ulonglong fuzzy_date) ltime->hour+= (ltime->month * 32 + ltime->day) * 24; ltime->year= ltime->month= ltime->day= 0; if (adjust_time_range_with_warn(ltime, - std::min<uint>(decimals, TIME_SECOND_PART_DIGITS))) + MY_MIN(decimals, TIME_SECOND_PART_DIGITS))) return (null_value= true); } diff --git a/sql/item_sum.cc b/sql/item_sum.cc index 945224c2623..be9aefa7338 100644 --- a/sql/item_sum.cc +++ b/sql/item_sum.cc @@ -2531,7 +2531,7 @@ bool Item_sum_bit::remove_as_window(ulonglong value) } // Prevent overflow; - num_values_added = std::min(num_values_added, num_values_added - 1); + num_values_added = MY_MIN(num_values_added, num_values_added - 1); set_bits_from_counters(); return 0; } @@ -2544,7 +2544,7 @@ bool Item_sum_bit::add_as_window(ulonglong value) bit_counters[i]+= (value & (1ULL << i)) ? 1 : 0; } // Prevent overflow; - num_values_added = std::max(num_values_added, num_values_added + 1); + num_values_added = MY_MAX(num_values_added, num_values_added + 1); set_bits_from_counters(); return 0; } diff --git a/sql/key.cc b/sql/key.cc index 3ee083e560f..ab93e8a0437 100644 --- a/sql/key.cc +++ b/sql/key.cc @@ -1,4 +1,5 @@ /* Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved. + Copyright (c) 2018, MariaDB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,9 +22,6 @@ #include "key.h" // key_rec_cmp #include "field.h" // Field -using std::min; -using std::max; - /* Search after a key that starts with 'field' @@ -135,7 +133,7 @@ void key_copy(uchar *to_key, const uchar *from_record, KEY *key_info, Don't copy data for null values The -1 below is to subtract the null byte which is already handled */ - length= min<uint>(key_length, key_part->store_length-1); + length= MY_MIN(key_length, uint(key_part->store_length)-1); if (with_zerofill) bzero((char*) to_key, length); continue; @@ -145,7 +143,7 @@ void key_copy(uchar *to_key, const uchar *from_record, KEY *key_info, key_part->key_part_flag & HA_VAR_LENGTH_PART) { key_length-= HA_KEY_BLOB_LENGTH; - length= min<uint>(key_length, key_part->length); + length= MY_MIN(key_length, key_part->length); uint bytes= key_part->field->get_key_image(to_key, length, Field::itRAW); if (with_zerofill && bytes < length) bzero((char*) to_key + bytes, length - bytes); @@ -153,7 +151,7 @@ void key_copy(uchar *to_key, const uchar *from_record, KEY *key_info, } else { - length= min<uint>(key_length, key_part->length); + length= MY_MIN(key_length, key_part->length); Field *field= key_part->field; CHARSET_INFO *cs= field->charset(); uint bytes= field->get_key_image(to_key, length, Field::itRAW); @@ -205,7 +203,7 @@ void key_restore(uchar *to_record, const uchar *from_key, KEY *key_info, Don't copy data for null bytes The -1 below is to subtract the null byte which is already handled */ - length= min<uint>(key_length, key_part->store_length-1); + length= MY_MIN(key_length, uint(key_part->store_length)-1); continue; } } @@ -247,7 +245,7 @@ void key_restore(uchar *to_record, const uchar *from_key, KEY *key_info, my_ptrdiff_t ptrdiff= to_record - field->table->record[0]; field->move_field_offset(ptrdiff); key_length-= HA_KEY_BLOB_LENGTH; - length= min<uint>(key_length, key_part->length); + length= MY_MIN(key_length, key_part->length); old_map= dbug_tmp_use_all_columns(field->table, field->table->write_set); field->set_key_image(from_key, length); dbug_tmp_restore_column_map(field->table->write_set, old_map); @@ -256,7 +254,7 @@ void key_restore(uchar *to_record, const uchar *from_key, KEY *key_info, } else { - length= min<uint>(key_length, key_part->length); + length= MY_MIN(key_length, key_part->length); /* skip the byte with 'uneven' bits, if used */ memcpy(to_record + key_part->offset, from_key + used_uneven_bits , (size_t) length - used_uneven_bits); @@ -314,7 +312,7 @@ bool key_cmp_if_same(TABLE *table,const uchar *key,uint idx,uint key_length) return 1; continue; } - length= min((uint) (key_end-key), store_length); + length= MY_MIN((uint) (key_end-key), store_length); if (!(key_part->key_type & (FIELDFLAG_NUMBER+FIELDFLAG_BINARY+ FIELDFLAG_PACK))) { @@ -392,7 +390,7 @@ void field_unpack(String *to, Field *field, const uchar *rec, uint max_length, tmp.length(charpos); } if (max_length < field->pack_length()) - tmp.length(min(tmp.length(),max_length)); + tmp.length(MY_MIN(tmp.length(),max_length)); ErrConvString err(&tmp); to->append(err.ptr()); } diff --git a/sql/log.cc b/sql/log.cc index 6923a6241cd..742b0a03e90 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -7108,7 +7108,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len); writer.remains= ev_len; - if (writer.write(ev, std::min<uint>(ev_len, length - hdr_offs))) + if (writer.write(ev, MY_MIN(ev_len, length - hdr_offs))) DBUG_RETURN(ER_ERROR_ON_WRITE); /* next event header at ... */ diff --git a/sql/log_event.cc b/sql/log_event.cc index ca659085228..c40e3be94c8 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -56,8 +56,6 @@ #define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1)) -using std::max; - /** BINLOG_CHECKSUM variable. */ @@ -1799,8 +1797,8 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN) DBUG_RETURN(LOG_READ_BOGUS); - if (data_len > max(max_allowed_packet, - opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER)) + if (data_len > MY_MAX(max_allowed_packet, + opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER)) DBUG_RETURN(LOG_READ_TOO_LARGE); if (likely(data_len > LOG_EVENT_MINIMAL_HEADER_LEN)) diff --git a/sql/mdl.h b/sql/mdl.h index f30c976ac94..be9cc806ec2 100644 --- a/sql/mdl.h +++ b/sql/mdl.h @@ -21,8 +21,6 @@ #include <mysql_com.h> #include <lf.h> -#include <algorithm> - class THD; class MDL_context; @@ -373,8 +371,7 @@ public: character set is utf-8, we can safely assume that no character starts with a zero byte. */ - using std::min; - return memcmp(m_ptr, rhs->m_ptr, min(m_length, rhs->m_length)); + return memcmp(m_ptr, rhs->m_ptr, MY_MIN(m_length, rhs->m_length)); } MDL_key(const MDL_key *rhs) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 067b2e70ba6..29b12934550 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -2928,16 +2928,17 @@ void unlink_thd(THD *thd) DBUG_ENTER("unlink_thd"); DBUG_PRINT("enter", ("thd: %p", thd)); + thd->cleanup(); + thd->add_status_to_global(); + unlink_not_visible_thd(thd); + /* Do not decrement when its wsrep system thread. wsrep_applier is set for applier as well as rollbacker threads. */ if (IF_WSREP(!thd->wsrep_applier, 1)) dec_connection_count(thd->scheduler); - thd->cleanup(); - thd->add_status_to_global(); - unlink_not_visible_thd(thd); thd->free_connection(); DBUG_VOID_RETURN; diff --git a/sql/opt_subselect.cc b/sql/opt_subselect.cc index dada23508b5..9d6b67e845e 100644 --- a/sql/opt_subselect.cc +++ b/sql/opt_subselect.cc @@ -2662,7 +2662,8 @@ void advance_sj_state(JOIN *join, table_map remaining_tables, uint idx, LooseScan detector in best_access_path) */ remaining_tables &= ~new_join_tab->table->map; - table_map dups_producing_tables; + table_map dups_producing_tables, prev_dups_producing_tables, + prev_sjm_lookup_tables; if (idx == join->const_tables) dups_producing_tables= 0; @@ -2673,7 +2674,7 @@ void advance_sj_state(JOIN *join, table_map remaining_tables, uint idx, if ((emb_sj_nest= new_join_tab->emb_sj_nest)) dups_producing_tables |= emb_sj_nest->sj_inner_tables; - Semi_join_strategy_picker **strategy; + Semi_join_strategy_picker **strategy, **prev_strategy; if (idx == join->const_tables) { /* First table, initialize pickers */ @@ -2725,23 +2726,54 @@ void advance_sj_state(JOIN *join, table_map remaining_tables, uint idx, 3. We have no clue what to do about fanount of semi-join Y. */ if ((dups_producing_tables & handled_fanout) || - (read_time < *current_read_time && + (read_time < *current_read_time && !(handled_fanout & pos->inner_tables_handled_with_other_sjs))) { - /* Mark strategy as used */ - (*strategy)->mark_used(); - pos->sj_strategy= sj_strategy; - if (sj_strategy == SJ_OPT_MATERIALIZE) - join->sjm_lookup_tables |= handled_fanout; + DBUG_ASSERT(pos->sj_strategy != sj_strategy); + /* + If the strategy choosen first time or + the strategy replace strategy which was used to exectly the same + tables + */ + if (pos->sj_strategy == SJ_OPT_NONE || + handled_fanout == + (prev_dups_producing_tables ^ dups_producing_tables)) + { + prev_strategy= strategy; + if (pos->sj_strategy == SJ_OPT_NONE) + { + prev_dups_producing_tables= dups_producing_tables; + prev_sjm_lookup_tables= join->sjm_lookup_tables; + } + /* Mark strategy as used */ + (*strategy)->mark_used(); + pos->sj_strategy= sj_strategy; + if (sj_strategy == SJ_OPT_MATERIALIZE) + join->sjm_lookup_tables |= handled_fanout; + else + join->sjm_lookup_tables &= ~handled_fanout; + *current_read_time= read_time; + *current_record_count= rec_count; + dups_producing_tables &= ~handled_fanout; + //TODO: update bitmap of semi-joins that were handled together with + // others. + if (is_multiple_semi_joins(join, join->positions, idx, + handled_fanout)) + pos->inner_tables_handled_with_other_sjs |= handled_fanout; + } else - join->sjm_lookup_tables &= ~handled_fanout; - *current_read_time= read_time; - *current_record_count= rec_count; - dups_producing_tables &= ~handled_fanout; - //TODO: update bitmap of semi-joins that were handled together with - // others. - if (is_multiple_semi_joins(join, join->positions, idx, handled_fanout)) - pos->inner_tables_handled_with_other_sjs |= handled_fanout; + { + /* Conflict fall to most general variant */ + (*prev_strategy)->set_empty(); + dups_producing_tables= prev_dups_producing_tables; + join->sjm_lookup_tables= prev_sjm_lookup_tables; + // mark it 'none' to avpoid loops + pos->sj_strategy= SJ_OPT_NONE; + // next skip to last; + strategy= pickers + + (sizeof(pickers)/sizeof(Semi_join_strategy_picker*) - 3); + continue; + } } else { diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 9f48f908102..78e6165a551 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -582,7 +582,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, bool table_opened= false; TABLE *table; list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr; - uint64_t best_sub_id; + uint64 best_sub_id; element *elem; ulonglong thd_saved_option= thd->variables.option_bits; Query_tables_list lex_backup; diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 14d74dc4bb7..260c35e8c04 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -133,6 +133,19 @@ public: extern TYPELIB slave_parallel_mode_typelib; +typedef struct st_rows_event_tracker +{ + char binlog_file_name[FN_REFLEN]; + my_off_t first_seen; + my_off_t last_seen; + bool stmt_end_seen; + void update(const char* file_name, my_off_t pos, + const char* buf, + const Format_description_log_event *fdle); + void reset(); + bool check_and_report(const char* file_name, my_off_t pos); +} Rows_event_tracker; + /***************************************************************************** Replication IO Thread @@ -301,6 +314,14 @@ class Master_info : public Slave_reporting_capability uint64 gtid_reconnect_event_skip_count; /* gtid_event_seen is false until we receive first GTID event from master. */ bool gtid_event_seen; + /** + The struct holds some history of Rows- log-event reading/queuing + by the receiver thread. Its fields are updated per each such event + at time of queue_event(), and they are checked to detect + the Rows- event group integrity violation at time of first non-Rows- + event gets handled. + */ + Rows_event_tracker rows_event_tracker; bool in_start_all_slaves, in_stop_all_slaves; bool in_flush_all_relay_logs; uint users; /* Active user for object */ diff --git a/sql/set_var.cc b/sql/set_var.cc index 311b33bc0dd..097124a0b3a 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -1323,7 +1323,7 @@ resolve_engine_list_item(THD *thd, plugin_ref *list, uint32 *idx, { LEX_CSTRING item_str; plugin_ref ref; - uint32_t i; + uint32 i; THD *thd_or_null = (temp_copy ? thd : NULL); item_str.str= pos; diff --git a/sql/slave.cc b/sql/slave.cc index 90ad8bbdef1..b69e92a8363 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3659,7 +3659,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, we suppress prints to .err file as long as the reconnect happens without problems */ - *suppress_warnings= TRUE; + *suppress_warnings= + global_system_variables.log_warnings < 2 ? TRUE : FALSE; } else { @@ -4589,6 +4590,7 @@ pthread_handler_t handle_slave_io(void *arg) mi->abort_slave = 0; mysql_mutex_unlock(&mi->run_lock); mysql_cond_broadcast(&mi->start_cond); + mi->rows_event_tracker.reset(); DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu", mi->master_log_name, mi->master_log_pos)); @@ -4672,6 +4674,10 @@ connected: */ mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid; mi->gtid_event_seen= false; + /* + Reset stale state of the rows-event group tracker at reconnect. + */ + mi->rows_event_tracker.reset(); } #ifdef ENABLED_DEBUG_SYNC @@ -6077,7 +6083,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) char* new_buf = NULL; char new_buf_arr[4096]; bool is_malloc = false; - + bool is_rows_event= false; /* FD_q must have been prepared for the first R_a event inside get_master_version_and_clock() @@ -6511,11 +6517,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) got_gtid_event= true; if (mi->using_gtid == Master_info::USE_GTID_NO) goto default_action; - if (unlikely(!mi->gtid_event_seen)) + if (unlikely(mi->gtid_reconnect_event_skip_count)) { - mi->gtid_event_seen= true; - if (mi->gtid_reconnect_event_skip_count) + if (likely(!mi->gtid_event_seen)) { + mi->gtid_event_seen= true; /* If we are reconnecting, and we need to skip a partial event group already queued to the relay log before the reconnect, then we check @@ -6544,13 +6550,45 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first); goto err; } + if (global_system_variables.log_warnings > 1) + { + bool first= true; + StringBuffer<1024> gtid_text; + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + sql_print_information("Slave IO thread is reconnected to " + "receive Gtid_log_event %s. It is to skip %llu " + "already received events including the gtid one", + gtid_text.ptr(), + mi->events_queued_since_last_gtid); + } + goto default_action; } - } + else + { + bool first; + StringBuffer<1024> gtid_text; - if (unlikely(mi->gtid_reconnect_event_skip_count)) - { - goto default_action; + gtid_text.append(STRING_WITH_LEN("Last received gtid: ")); + first= true; + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + gtid_text.append(STRING_WITH_LEN(", currently received: ")); + first= true; + rpl_slave_state_tostring_helper(>id_text, &event_gtid, &first); + + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + sql_print_error("Slave IO thread has received a new Gtid_log_event " + "while skipping already logged events " + "after reconnect. %s. %llu remains to be skipped. " + "The number of originally read events was %llu", + gtid_text.ptr(), + mi->gtid_reconnect_event_skip_count, + mi->events_queued_since_last_gtid); + goto err; + } } + mi->gtid_event_seen= true; /* We have successfully queued to relay log everything before this GTID, so @@ -6617,8 +6655,34 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) goto err; } } - buf = new_buf; is_compress_event = true; + buf = new_buf; + /* + As we are uncertain about compressed V2 rows events, we don't track + them + */ + if (LOG_EVENT_IS_ROW_V2((Log_event_type) buf[EVENT_TYPE_OFFSET])) + goto default_action; + /* fall through */ + case WRITE_ROWS_EVENT_V1: + case UPDATE_ROWS_EVENT_V1: + case DELETE_ROWS_EVENT_V1: + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + { + is_rows_event= true; + mi->rows_event_tracker.update(mi->master_log_name, + mi->master_log_pos, + buf, + mi->rli.relay_log. + description_event_for_queue); + + DBUG_EXECUTE_IF("simulate_stmt_end_rows_event_loss", + { + mi->rows_event_tracker.stmt_end_seen= false; + }); + } goto default_action; #ifndef DBUG_OFF @@ -6677,6 +6741,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } /* + Integrity of Rows- event group check. + A sequence of Rows- events must end with STMT_END_F flagged one. + Even when Heartbeat event interrupts Rows- events flow this must indicate a + malfunction e.g logging on the master. + */ + if (((uchar) buf[EVENT_TYPE_OFFSET] != HEARTBEAT_LOG_EVENT) && + !is_rows_event && + mi->rows_event_tracker.check_and_report(mi->master_log_name, + mi->master_log_pos)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + + /* If we filter events master-side (eg. @@skip_replication), we will see holes in the event positions from the master. If we see such a hole, adjust mi->master_log_pos accordingly so we maintain the correct position (for @@ -6845,6 +6924,21 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) The whole of the current event group is queued. So in case of reconnect we can start from after the current GTID. */ + if (mi->gtid_reconnect_event_skip_count) + { + bool first= true; + StringBuffer<1024> gtid_text; + + rpl_slave_state_tostring_helper(>id_text, &mi->last_queued_gtid, + &first); + sql_print_error("Slave IO thread received a terminal event from " + "group %s whose retrieval was interrupted " + "with reconnect. We still had %llu events to read. " + "The number of originally read events was %llu", + gtid_text.ptr(), + mi->gtid_reconnect_event_skip_count, + mi->events_queued_since_last_gtid); + } mi->gtid_current_pos.update(&mi->last_queued_gtid); mi->events_queued_since_last_gtid= 0; @@ -7844,6 +7938,92 @@ bool rpl_master_erroneous_autoinc(THD *thd) return FALSE; } + +static bool get_row_event_stmt_end(const char* buf, + const Format_description_log_event *fdle) +{ + uint8 const common_header_len= fdle->common_header_len; + Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]; + + uint8 const post_header_len= fdle->post_header_len[event_type-1]; + const char *flag_start= buf + common_header_len; + /* + The term 4 below signifies that master is of 'an intermediate source', see + Rows_log_event::Rows_log_event. + */ + flag_start += RW_MAPID_OFFSET + (post_header_len == 6) ? 4 : RW_FLAGS_OFFSET; + + return (uint2korr(flag_start) & Rows_log_event::STMT_END_F) != 0; +} + + +/* + Reset log event tracking data. +*/ + +void Rows_event_tracker::reset() +{ + binlog_file_name[0]= 0; + first_seen= last_seen= 0; + stmt_end_seen= false; +} + + +/* + Update log event tracking data. + + The first- and last- seen event binlog position get memorized, as + well as the end-of-statement status of the last one. +*/ + +void Rows_event_tracker::update(const char* file_name, my_off_t pos, + const char* buf, + const Format_description_log_event *fdle) +{ + if (!first_seen) + { + first_seen= pos; + strmake(binlog_file_name, file_name, sizeof(binlog_file_name) - 1); + } + last_seen= pos; + DBUG_ASSERT(stmt_end_seen == 0); // We can only have one + stmt_end_seen= get_row_event_stmt_end(buf, fdle); +}; + + +/** + The function is called at next event reading + after a sequence of Rows- log-events. It checks the end-of-statement status + of the past sequence to report on any isssue. + In the positive case the tracker gets reset. + + @return true when the Rows- event group integrity found compromised, + false otherwise. +*/ +bool Rows_event_tracker::check_and_report(const char* file_name, + my_off_t pos) +{ + if (last_seen) + { + // there was at least one "block" event previously + if (!stmt_end_seen) + { + sql_print_error("Slave IO thread did not receive an expected " + "Rows-log end-of-statement for event starting " + "at log '%s' position %llu " + "whose last block was seen at log '%s' position %llu. " + "The end-of-statement should have been delivered " + "before the current one at log '%s' position %llu", + binlog_file_name, first_seen, + binlog_file_name, last_seen, file_name, pos); + return true; + } + reset(); + } + + return false; +} + /** @} (end of group Replication) */ diff --git a/sql/sql_class.h b/sql/sql_class.h index 5469171f0e3..28c603dd6b3 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -837,7 +837,7 @@ typedef struct system_status_var ulonglong table_open_cache_overflows; double last_query_cost; double cpu_time, busy_time; - uint32_t threads_running; + uint32 threads_running; /* Don't initialize */ /* Memory used for thread local storage */ int64 max_local_memory_used; diff --git a/sql/sql_cte.cc b/sql/sql_cte.cc index 9608436226c..7da088cdfd5 100644 --- a/sql/sql_cte.cc +++ b/sql/sql_cte.cc @@ -833,12 +833,19 @@ st_select_lex_unit *With_element::clone_parsed_spec(THD *thd, parse_status= parse_sql(thd, &parser_state, 0); if (parse_status) goto err; + + if (check_dependencies_in_with_clauses(lex->with_clauses_list)) + goto err; + spec_tables= lex->query_tables; spec_tables_tail= 0; for (TABLE_LIST *tbl= spec_tables; tbl; tbl= tbl->next_global) { + if (!tbl->derived && !tbl->schema_table && + thd->open_temporary_table(tbl)) + goto err; spec_tables_tail= tbl; } if (check_table_access(thd, SELECT_ACL, spec_tables, FALSE, UINT_MAX, FALSE)) diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index b058af531c6..44c8236023c 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -3274,6 +3274,9 @@ mysql_execute_command(THD *thd) thd->get_stmt_da()->opt_clear_warning_info(thd->query_id); } + if (check_dependencies_in_with_clauses(thd->lex->with_clauses_list)) + DBUG_RETURN(1); + #ifdef HAVE_REPLICATION if (unlikely(thd->slave_thread)) { @@ -3731,14 +3734,6 @@ mysql_execute_command(THD *thd) ulong privileges_requested= lex->exchange ? SELECT_ACL | FILE_ACL : SELECT_ACL; - /* - The same function must be called for DML commands - when CTEs are supported in DML statements - */ - res= check_dependencies_in_with_clauses(thd->lex->with_clauses_list); - if (res) - break; - if (all_tables) res= check_table_access(thd, privileges_requested, @@ -4165,8 +4160,7 @@ mysql_execute_command(THD *thd) /* Copy temporarily the statement flags to thd for lock_table_names() */ uint save_thd_create_info_options= thd->lex->create_info.options; thd->lex->create_info.options|= create_info.options; - if (!(res= check_dependencies_in_with_clauses(lex->with_clauses_list))) - res= open_and_lock_tables(thd, create_info, lex->query_tables, TRUE, 0); + res= open_and_lock_tables(thd, create_info, lex->query_tables, TRUE, 0); thd->lex->create_info.options= save_thd_create_info_options; if (res) { @@ -4783,8 +4777,7 @@ end_with_restore_list: unit->set_limit(select_lex); - if (!(res= check_dependencies_in_with_clauses(lex->with_clauses_list)) && - !(res=open_and_lock_tables(thd, all_tables, TRUE, 0))) + if (!(res=open_and_lock_tables(thd, all_tables, TRUE, 0))) { MYSQL_INSERT_SELECT_START(thd->query()); /* @@ -5115,9 +5108,6 @@ end_with_restore_list: { List<set_var_base> *lex_var_list= &lex->var_list; - if (check_dependencies_in_with_clauses(thd->lex->with_clauses_list)) - goto error; - if ((check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE) || open_and_lock_tables(thd, all_tables, TRUE, 0))) goto error; @@ -6436,8 +6426,6 @@ static bool execute_sqlcom_select(THD *thd, TABLE_LIST *all_tables) new (thd->mem_root) Item_int(thd, (ulonglong) thd->variables.select_limit); } - if (check_dependencies_in_with_clauses(lex->with_clauses_list)) - return 1; if (thd->variables.vers_alter_history == VERS_ALTER_HISTORY_SURVIVE) { diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index b31c2f36db1..80ac2b22a86 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -1516,8 +1516,6 @@ static int mysql_test_select(Prepared_statement *stmt, lex->select_lex.context.resolve_in_select_list= TRUE; ulong privilege= lex->exchange ? SELECT_ACL | FILE_ACL : SELECT_ACL; - if (check_dependencies_in_with_clauses(lex->with_clauses_list)) - goto error; if (tables) { if (check_table_access(thd, privilege, tables, FALSE, UINT_MAX, FALSE)) @@ -1784,9 +1782,6 @@ static bool mysql_test_create_table(Prepared_statement *stmt) if (create_table_precheck(thd, tables, create_table)) DBUG_RETURN(TRUE); - if (check_dependencies_in_with_clauses(lex->with_clauses_list)) - DBUG_RETURN(TRUE); - if (select_lex->item_list.elements) { /* Base table and temporary table are not in the same name space. */ @@ -2177,9 +2172,6 @@ static bool mysql_test_insert_select(Prepared_statement *stmt, if (insert_precheck(stmt->thd, tables)) return 1; - if (check_dependencies_in_with_clauses(lex->with_clauses_list)) - return 1; - /* store it, because mysql_insert_select_prepare_tester change it */ first_local_table= lex->select_lex.table_list.first; DBUG_ASSERT(first_local_table != 0); @@ -2282,6 +2274,9 @@ static bool check_prepared_statement(Prepared_statement *stmt) if (tables) thd->get_stmt_da()->opt_clear_warning_info(thd->query_id); + if (check_dependencies_in_with_clauses(thd->lex->with_clauses_list)) + goto error; + if (sql_command_flags[sql_command] & CF_HA_CLOSE) mysql_ha_rm_tables(thd, tables); diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 0527eef4fde..d6fe06173a2 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -18358,7 +18358,7 @@ bool create_internal_tmp_table(TABLE *table, KEY *keyinfo, table->in_use->inc_status_created_tmp_disk_tables(); table->in_use->inc_status_created_tmp_tables(); share->db_record_offset= 1; - table->created= TRUE; + table->set_created(); DBUG_RETURN(0); err: DBUG_RETURN(1); @@ -18869,8 +18869,8 @@ int rr_sequential_and_unpack(READ_RECORD *info) */ bool instantiate_tmp_table(TABLE *table, KEY *keyinfo, - MARIA_COLUMNDEF *start_recinfo, - MARIA_COLUMNDEF **recinfo, + TMP_ENGINE_COLUMNDEF *start_recinfo, + TMP_ENGINE_COLUMNDEF **recinfo, ulonglong options) { if (table->s->db_type() == TMP_ENGINE_HTON) diff --git a/sql/sql_select.h b/sql/sql_select.h index fbadca2255d..298097f1b98 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -2348,8 +2348,8 @@ bool create_internal_tmp_table(TABLE *table, KEY *keyinfo, TMP_ENGINE_COLUMNDEF **recinfo, ulonglong options); bool instantiate_tmp_table(TABLE *table, KEY *keyinfo, - MARIA_COLUMNDEF *start_recinfo, - MARIA_COLUMNDEF **recinfo, + TMP_ENGINE_COLUMNDEF *start_recinfo, + TMP_ENGINE_COLUMNDEF **recinfo, ulonglong options); bool open_tmp_table(TABLE *table); void setup_tmp_table_column_bitmaps(TABLE *table, uchar *bitmaps); diff --git a/sql/sql_union.cc b/sql/sql_union.cc index dda57cc5b4d..f18d30a45c0 100644 --- a/sql/sql_union.cc +++ b/sql/sql_union.cc @@ -1710,6 +1710,7 @@ bool st_select_lex_unit::exec_recursive() sq; sq= sq->next_with_rec_ref) { + sq->reset(); sq->engine->force_reexecution(); } |