diff options
Diffstat (limited to 'sql')
47 files changed, 366 insertions, 200 deletions
diff --git a/sql/field.h b/sql/field.h index cf115cceeae..27f7095c1d0 100644 --- a/sql/field.h +++ b/sql/field.h @@ -1414,7 +1414,7 @@ public: virtual uint max_packed_col_length(uint max_length) { return max_length;} - uint offset(uchar *record) const + uint offset(const uchar *record) const { return (uint) (ptr - record); } diff --git a/sql/ha_partition.cc b/sql/ha_partition.cc index 00be71e55eb..a08928d964b 100644 --- a/sql/ha_partition.cc +++ b/sql/ha_partition.cc @@ -4247,7 +4247,7 @@ void ha_partition::try_semi_consistent_read(bool yes) determining which partition the row should be written to. */ -int ha_partition::write_row(uchar * buf) +int ha_partition::write_row(const uchar * buf) { uint32 part_id; int error; diff --git a/sql/ha_partition.h b/sql/ha_partition.h index 3ad00e853d1..2148926c1ed 100644 --- a/sql/ha_partition.h +++ b/sql/ha_partition.h @@ -628,7 +628,7 @@ public: start_bulk_insert and end_bulk_insert is called before and after a number of calls to write_row. */ - virtual int write_row(uchar * buf); + virtual int write_row(const uchar * buf); virtual bool start_bulk_update(); virtual int exec_bulk_update(ha_rows *dup_key_found); virtual int end_bulk_update(); diff --git a/sql/ha_sequence.cc b/sql/ha_sequence.cc index d676e4d41f6..87d8339881d 100644 --- a/sql/ha_sequence.cc +++ b/sql/ha_sequence.cc @@ -194,7 +194,7 @@ int ha_sequence::create(const char *name, TABLE *form, the sequence with 'buf' as the sequence object is already up to date. */ -int ha_sequence::write_row(uchar *buf) +int ha_sequence::write_row(const uchar *buf) { int error; sequence_definition tmp_seq; diff --git a/sql/ha_sequence.h b/sql/ha_sequence.h index fd9da05b591..4360e9faa3d 100644 --- a/sql/ha_sequence.h +++ b/sql/ha_sequence.h @@ -71,7 +71,7 @@ public: int create(const char *name, TABLE *form, HA_CREATE_INFO *create_info); handler *clone(const char *name, MEM_ROOT *mem_root); - int write_row(uchar *buf); + int write_row(const uchar *buf); Table_flags table_flags() const; /* One can't update or delete from sequence engine */ int update_row(const uchar *old_data, const uchar *new_data) diff --git a/sql/handler.cc b/sql/handler.cc index 93d52c970ae..3e54d4a19d0 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -6452,7 +6452,7 @@ static int wsrep_after_row(THD *thd) #endif /* WITH_WSREP */ static int check_duplicate_long_entry_key(TABLE *table, handler *h, - uchar *new_rec, uint key_no) + const uchar *new_rec, uint key_no) { Field *hash_field; int result, error= 0; @@ -6541,7 +6541,8 @@ exit: unique constraint on long columns. @returns 0 if no duplicate else returns error */ -static int check_duplicate_long_entries(TABLE *table, handler *h, uchar *new_rec) +static int check_duplicate_long_entries(TABLE *table, handler *h, + const uchar *new_rec) { table->file->errkey= -1; int result; @@ -6610,7 +6611,7 @@ static int check_duplicate_long_entries_update(TABLE *table, handler *h, uchar * return error; } -int handler::ha_write_row(uchar *buf) +int handler::ha_write_row(const uchar *buf) { int error; Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; @@ -6699,7 +6700,7 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data) Update first row. Only used by sequence tables */ -int handler::update_first_row(uchar *new_data) +int handler::update_first_row(const uchar *new_data) { int error; if (likely(!(error= ha_rnd_init(1)))) diff --git a/sql/handler.h b/sql/handler.h index d7061ff9504..cbde570483f 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -646,6 +646,7 @@ typedef ulonglong alter_table_operations; #define ALTER_ADD_FOREIGN_KEY (1ULL << 21) // Set for DROP FOREIGN KEY #define ALTER_DROP_FOREIGN_KEY (1ULL << 22) +#define ALTER_CHANGE_INDEX_COMMENT (1ULL << 23) // Set for ADD [COLUMN] FIRST | AFTER #define ALTER_COLUMN_ORDER (1ULL << 25) #define ALTER_ADD_CHECK_CONSTRAINT (1ULL << 27) @@ -3242,7 +3243,7 @@ public: and delete_row() below. */ int ha_external_lock(THD *thd, int lock_type); - int ha_write_row(uchar * buf); + int ha_write_row(const uchar * buf); int ha_update_row(const uchar * old_data, const uchar * new_data); int ha_delete_row(const uchar * buf); void ha_release_auto_increment(); @@ -4533,7 +4534,7 @@ private: */ virtual int rnd_init(bool scan)= 0; virtual int rnd_end() { return 0; } - virtual int write_row(uchar *buf __attribute__((unused))) + virtual int write_row(const uchar *buf __attribute__((unused))) { return HA_ERR_WRONG_COMMAND; } @@ -4556,7 +4557,7 @@ private: Optimized function for updating the first row. Only used by sequence tables */ - virtual int update_first_row(uchar *new_data); + virtual int update_first_row(const uchar *new_data); virtual int delete_row(const uchar *buf __attribute__((unused))) { diff --git a/sql/item_cmpfunc.h b/sql/item_cmpfunc.h index aa20ce0a5a4..a302b205132 100644 --- a/sql/item_cmpfunc.h +++ b/sql/item_cmpfunc.h @@ -291,6 +291,7 @@ public: virtual const char* func_name() const { return "isnottrue"; } Item *get_copy(THD *thd) { return get_item_copy<Item_func_isnottrue>(thd, this); } + bool eval_not_null_tables(void *) { not_null_tables_cache= 0; return false; } }; @@ -322,6 +323,7 @@ public: virtual const char* func_name() const { return "isnotfalse"; } Item *get_copy(THD *thd) { return get_item_copy<Item_func_isnotfalse>(thd, this); } + bool eval_not_null_tables(void *) { not_null_tables_cache= 0; return false; } }; diff --git a/sql/item_func.cc b/sql/item_func.cc index eb569aced9b..882337870fc 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -6759,9 +6759,9 @@ longlong Item_func_lastval::val_int() /* Sets next value to be returned from sequences - SELECT setval('foo', 42, 0); Next nextval will return 43 - SELECT setval('foo', 42, 0, true); Same as above - SELECT setval('foo', 42, 0, false); Next nextval will return 42 + SELECT setval(foo, 42, 0); Next nextval will return 43 + SELECT setval(foo, 42, 0, true); Same as above + SELECT setval(foo, 42, 0, false); Next nextval will return 42 */ longlong Item_func_setval::val_int() diff --git a/sql/item_sum.cc b/sql/item_sum.cc index 0fbb0fd5ec3..3e915363c12 100644 --- a/sql/item_sum.cc +++ b/sql/item_sum.cc @@ -3058,11 +3058,14 @@ Item_sum_hybrid::min_max_update_str_field() if (!args[0]->null_value) { - result_field->val_str(&cmp->value2); - - if (result_field->is_null() || - (cmp_sign * sortcmp(res_str,&cmp->value2,collation.collation)) < 0) + if (result_field->is_null()) result_field->store(res_str->ptr(),res_str->length(),res_str->charset()); + else + { + result_field->val_str(&cmp->value2); + if ((cmp_sign * sortcmp(res_str,&cmp->value2,collation.collation)) < 0) + result_field->store(res_str->ptr(),res_str->length(),res_str->charset()); + } result_field->set_notnull(); } DBUG_VOID_RETURN; diff --git a/sql/lex_string.h b/sql/lex_string.h index 8579c00fe36..a62609c6b60 100644 --- a/sql/lex_string.h +++ b/sql/lex_string.h @@ -75,6 +75,10 @@ static inline bool cmp(const LEX_CSTRING *a, const LEX_CSTRING *b) return (a->length != b->length || memcmp(a->str, b->str, a->length)); } +static inline bool cmp(const LEX_CSTRING a, const LEX_CSTRING b) +{ + return a.length != b.length || memcmp(a.str, b.str, a.length); +} /* Compare if two LEX_CSTRING are equal. Assumption is that diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 70b1c1538e9..ba7fb9b8267 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -1455,41 +1455,44 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, size_t que bool trx_cache= FALSE; cache_type= Log_event::EVENT_INVALID_CACHE; - switch (lex->sql_command) + if (!direct) { - case SQLCOM_DROP_TABLE: - case SQLCOM_DROP_SEQUENCE: - use_cache= (lex->tmp_table() && thd->in_multi_stmt_transaction_mode()); - break; + switch (lex->sql_command) + { + case SQLCOM_DROP_TABLE: + case SQLCOM_DROP_SEQUENCE: + use_cache= (lex->tmp_table() && thd->in_multi_stmt_transaction_mode()); + break; - case SQLCOM_CREATE_TABLE: - case SQLCOM_CREATE_SEQUENCE: - /* - If we are using CREATE ... SELECT or if we are a slave - executing BEGIN...COMMIT (generated by CREATE...SELECT) we - have to use the transactional cache to ensure we don't - calculate any checksum for the CREATE part. - */ - trx_cache= (lex->first_select_lex()->item_list.elements && - thd->is_current_stmt_binlog_format_row()) || - (thd->variables.option_bits & OPTION_GTID_BEGIN); - use_cache= (lex->tmp_table() && - thd->in_multi_stmt_transaction_mode()) || trx_cache; - break; - case SQLCOM_SET_OPTION: - if (lex->autocommit) - use_cache= trx_cache= FALSE; - else - use_cache= TRUE; - break; - case SQLCOM_RELEASE_SAVEPOINT: - case SQLCOM_ROLLBACK_TO_SAVEPOINT: - case SQLCOM_SAVEPOINT: - use_cache= trx_cache= TRUE; - break; - default: - use_cache= sqlcom_can_generate_row_events(thd); - break; + case SQLCOM_CREATE_TABLE: + case SQLCOM_CREATE_SEQUENCE: + /* + If we are using CREATE ... SELECT or if we are a slave + executing BEGIN...COMMIT (generated by CREATE...SELECT) we + have to use the transactional cache to ensure we don't + calculate any checksum for the CREATE part. + */ + trx_cache= (lex->first_select_lex()->item_list.elements && + thd->is_current_stmt_binlog_format_row()) || + (thd->variables.option_bits & OPTION_GTID_BEGIN); + use_cache= (lex->tmp_table() && + thd->in_multi_stmt_transaction_mode()) || trx_cache; + break; + case SQLCOM_SET_OPTION: + if (lex->autocommit) + use_cache= trx_cache= FALSE; + else + use_cache= TRUE; + break; + case SQLCOM_RELEASE_SAVEPOINT: + case SQLCOM_ROLLBACK_TO_SAVEPOINT: + case SQLCOM_SAVEPOINT: + use_cache= trx_cache= TRUE; + break; + default: + use_cache= sqlcom_can_generate_row_events(thd); + break; + } } if (!use_cache || direct) @@ -1966,7 +1969,7 @@ compare_errors: thd->get_db(), query_arg); thd->is_slave_error= 1; #ifdef WITH_WSREP - if (thd->wsrep_apply_toi && wsrep_must_ignore_error(thd)) + if (wsrep_thd_is_toi(thd) && wsrep_must_ignore_error(thd)) { thd->clear_error(1); thd->killed= NOT_KILLED; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 506f624f974..78287c8e04c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -4795,6 +4795,7 @@ static int init_server_components() We need to call each of these following functions to ensure that all things are initialized so that unireg_abort() doesn't fail */ + my_cpu_init(); mdl_init(); if (tdc_init() || hostname_cache_init()) unireg_abort(1); @@ -5704,6 +5705,9 @@ int mysqld_main(int argc, char **argv) { wsrep_init_startup (false); } + + WSREP_DEBUG("Startup creating %ld applier threads running %lu", + wsrep_slave_threads - 1, wsrep_running_applier_threads); wsrep_create_appliers(wsrep_slave_threads - 1); } } @@ -7695,6 +7699,8 @@ SHOW_VAR status_vars[]= { {"wsrep_provider_vendor", (char*) &wsrep_provider_vendor, SHOW_CHAR_PTR}, {"wsrep_provider_capabilities", (char*) &wsrep_provider_capabilities, SHOW_CHAR_PTR}, {"wsrep_thread_count", (char*) &wsrep_running_threads, SHOW_LONG_NOFLUSH}, + {"wsrep_applier_thread_count", (char*) &wsrep_running_applier_threads, SHOW_LONG_NOFLUSH}, + {"wsrep_rollbacker_thread_count", (char *) &wsrep_running_rollbacker_threads, SHOW_LONG_NOFLUSH}, {"wsrep_cluster_capabilities", (char*) &wsrep_cluster_capabilities, SHOW_CHAR_PTR}, {"wsrep", (char*) &wsrep_show_status, SHOW_FUNC}, #endif diff --git a/sql/protocol.cc b/sql/protocol.cc index deaf932a6d2..7e63fcf81a4 100644 --- a/sql/protocol.cc +++ b/sql/protocol.cc @@ -461,6 +461,17 @@ bool net_send_error_packet(THD *thd, uint sql_errno, const char *err, */ if ((save_compress= net->compress)) net->compress= 2; + + /* + Sometimes, we send errors "out-of-band", e.g ER_CONNECTION_KILLED + on an idle connection. The current protocol "sequence number" is 0, + however some client drivers would however always expect packets + coming from server to have seq_no > 0, due to missing awareness + of "out-of-band" operations. Make these clients happy. + */ + if (!net->pkt_nr) + net->pkt_nr= 1; + ret= net_write_command(net,(uchar) 255, (uchar*) "", 0, (uchar*) buff, length); net->compress= save_compress; diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 509320e7a97..732edcd5bc6 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -115,15 +115,6 @@ void Master_info::wait_until_free() Master_info::~Master_info() { wait_until_free(); -#ifdef WITH_WSREP - /* - Do not free "wsrep" rpl_filter. It will eventually be freed by - free_all_rpl_filters() when server terminates. - */ - if (strncmp(connection_name.str, STRING_WITH_LEN("wsrep"))) -#endif - rpl_filters.delete_element(connection_name.str, connection_name.length, - (void (*)(const char*, uchar*)) free_rpl_filter); my_free(const_cast<char*>(connection_name.str)); delete_dynamic(&ignore_server_ids); mysql_mutex_destroy(&run_lock); diff --git a/sql/semisync_master_ack_receiver.h b/sql/semisync_master_ack_receiver.h index feb3a51ccea..138f7b5aeed 100644 --- a/sql/semisync_master_ack_receiver.h +++ b/sql/semisync_master_ack_receiver.h @@ -211,7 +211,7 @@ public: { my_socket socket_id= slave->sock_fd(); m_max_fd= (socket_id > m_max_fd ? socket_id : m_max_fd); -#ifndef WINDOWS +#ifndef _WIN32 if (socket_id > FD_SETSIZE) { sql_print_error("Semisync slave socket fd is %u. " @@ -219,7 +219,7 @@ public: "greater than %u (FD_SETSIZE).", socket_id, FD_SETSIZE); return 0; } -#endif //WINDOWS +#endif //_WIN32 FD_SET(socket_id, &m_init_fds); fds_index++; } diff --git a/sql/slave.cc b/sql/slave.cc index 03777dc8ce8..6a0eee1d205 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1046,7 +1046,7 @@ bool init_slave_transaction_retry_errors(const char* arg) { if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code))) break; - if (err_code > 0 && err_code < ER_ERROR_LAST) + if (err_code > 0) slave_transaction_retry_errors[i++]= (uint) err_code; while (!my_isdigit(system_charset_info,*p) && *p) p++; diff --git a/sql/sp_head.cc b/sql/sp_head.cc index 889e0b42698..90fb288cc3c 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -147,7 +147,7 @@ sp_get_flags_for_command(LEX *lex) switch (lex->sql_command) { case SQLCOM_SELECT: - if (lex->result) + if (lex->result && !lex->analyze_stmt) { flags= 0; /* This is a SELECT with INTO clause */ break; diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index 11639666c10..e934183c1d6 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -2072,7 +2072,7 @@ static int set_user_salt(ACL_USER::AUTH *auth, plugin_ref plugin) auth->salt.length= len; } else - auth->salt= auth->auth_string; + auth->salt= safe_lexcstrdup_root(&acl_memroot, auth->auth_string); return 0; } diff --git a/sql/sql_audit.cc b/sql/sql_audit.cc index e8a00abf30b..ed175ae4865 100644 --- a/sql/sql_audit.cc +++ b/sql/sql_audit.cc @@ -119,12 +119,27 @@ void mysql_audit_acquire_plugins(THD *thd, ulong *event_class_mask) { plugin_foreach(thd, acquire_plugins, MYSQL_AUDIT_PLUGIN, event_class_mask); add_audit_mask(thd->audit_class_mask, event_class_mask); + thd->audit_plugin_version= global_plugin_version; } DBUG_VOID_RETURN; } /** + Check if there were changes in the state of plugins + so we need to do the mysql_audit_release asap. + + @param[in] thd + +*/ + +my_bool mysql_audit_release_required(THD *thd) +{ + return thd && (thd->audit_plugin_version != global_plugin_version); +} + + +/** Release any resources associated with the current thd. @param[in] thd @@ -159,6 +174,7 @@ void mysql_audit_release(THD *thd) /* Reset the state of thread values */ reset_dynamic(&thd->audit_class_plugins); bzero(thd->audit_class_mask, sizeof(thd->audit_class_mask)); + thd->audit_plugin_version= -1; } diff --git a/sql/sql_audit.h b/sql/sql_audit.h index 327fe6052ab..59cced13b0a 100644 --- a/sql/sql_audit.h +++ b/sql/sql_audit.h @@ -58,6 +58,7 @@ static inline void mysql_audit_notify(THD *thd, uint event_class, #define mysql_audit_connection_enabled() 0 #define mysql_audit_table_enabled() 0 #endif +extern my_bool mysql_audit_release_required(THD *thd); extern void mysql_audit_release(THD *thd); static inline unsigned int strlen_uint(const char *s) diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 0f24604aa5e..9a646ebb822 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -659,6 +659,9 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) waiting_on_group_commit(FALSE), has_waiter(FALSE), spcont(NULL), m_parser_state(NULL), +#ifndef EMBEDDED_LIBRARY + audit_plugin_version(-1), +#endif #if defined(ENABLED_DEBUG_SYNC) debug_sync_control(0), #endif /* defined(ENABLED_DEBUG_SYNC) */ @@ -691,7 +694,6 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) wsrep_po_handle(WSREP_PO_INITIALIZER), wsrep_po_cnt(0), wsrep_apply_format(0), - wsrep_apply_toi(false), wsrep_rbr_buf(NULL), wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED), wsrep_affected_rows(0), diff --git a/sql/sql_class.h b/sql/sql_class.h index bad9a5796f8..e8abcb3af34 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3241,6 +3241,7 @@ public: added to the list of audit plugins which are currently in use. */ unsigned long audit_class_mask[MYSQL_AUDIT_CLASS_MASK_SIZE]; + int audit_plugin_version; #endif #if defined(ENABLED_DEBUG_SYNC) @@ -4838,7 +4839,6 @@ public: rpl_sid wsrep_po_sid; #endif /* GTID_SUPPORT */ void *wsrep_apply_format; - bool wsrep_apply_toi; /* applier processing in TOI */ uchar* wsrep_rbr_buf; wsrep_gtid_t wsrep_sync_wait_gtid; // wsrep_gtid_t wsrep_last_written_gtid; diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index c119a111eed..37c4c27c08b 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -1409,7 +1409,8 @@ void do_handle_one_connection(CONNECT *connect, bool put_in_cache) while (thd_is_connection_alive(thd)) { - mysql_audit_release(thd); + if (mysql_audit_release_required(thd)) + mysql_audit_release(thd); if (do_command(thd)) break; } diff --git a/sql/sql_const.h b/sql/sql_const.h index 7aa4249f5ad..f7c820c727b 100644 --- a/sql/sql_const.h +++ b/sql/sql_const.h @@ -64,7 +64,7 @@ CREATE TABLE t1 (c VARBINARY(65534)); CREATE TABLE t1 (c VARBINARY(65535)); Like VARCHAR(65536), they will be converted to BLOB automatically - in non-sctict mode. + in non-strict mode. */ #define MAX_FIELD_VARCHARLENGTH (65535-2-1) #define MAX_FIELD_BLOBLENGTH UINT_MAX32 /* cf field_blob::get_length() */ diff --git a/sql/sql_cte.cc b/sql/sql_cte.cc index 57ed922c2b8..eb39cb94cef 100644 --- a/sql/sql_cte.cc +++ b/sql/sql_cte.cc @@ -767,7 +767,9 @@ bool With_clause::prepare_unreferenced_elements(THD *thd) true on failure */ -bool With_element::set_unparsed_spec(THD *thd, char *spec_start, char *spec_end, +bool With_element::set_unparsed_spec(THD *thd, + const char *spec_start, + const char *spec_end, my_ptrdiff_t spec_offset) { stmt_prepare_mode= thd->m_parser_state->m_lip.stmt_prepare_mode; diff --git a/sql/sql_cte.h b/sql/sql_cte.h index 03c697bf746..80d56644d7e 100644 --- a/sql/sql_cte.h +++ b/sql/sql_cte.h @@ -197,7 +197,7 @@ public: TABLE_LIST *find_first_sq_rec_ref_in_select(st_select_lex *sel); - bool set_unparsed_spec(THD *thd, char *spec_start, char *spec_end, + bool set_unparsed_spec(THD *thd, const char *spec_start, const char *spec_end, my_ptrdiff_t spec_offset); st_select_lex_unit *clone_parsed_spec(THD *thd, TABLE_LIST *with_table); diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index feda4cd23fb..0ff4d5f7684 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -1514,6 +1514,7 @@ int Lex_input_stream::lex_one_token(YYSTYPE *yylval, THD *thd) case MY_LEX_SKIP: // This should not happen if (c != ')') next_state= MY_LEX_START; // Allow signed numbers + yylval->kwd.set_keyword(m_tok_start, 1); return((int) c); case MY_LEX_MINUS_OR_COMMENT: diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 16465c47a2f..869ece6a398 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1660,8 +1660,16 @@ bool dispatch_command(enum enum_server_command command, THD *thd, case COM_RESET_CONNECTION: { thd->status_var.com_other++; +#ifdef WITH_WSREP + wsrep_after_command_ignore_result(thd); + wsrep_close(thd); +#endif /* WITH_WSREP */ thd->change_user(); thd->clear_error(); // if errors from rollback +#ifdef WITH_WSREP + wsrep_open(thd); + wsrep_before_command(thd); +#endif /* WITH_WSREP */ /* Restore original charset from client authentication packet.*/ if(thd->org_charset) thd->update_charset(thd->org_charset,thd->org_charset,thd->org_charset); @@ -1673,7 +1681,15 @@ bool dispatch_command(enum enum_server_command command, THD *thd, int auth_rc; status_var_increment(thd->status_var.com_other); +#ifdef WITH_WSREP + wsrep_after_command_ignore_result(thd); + wsrep_close(thd); +#endif /* WITH_WSREP */ thd->change_user(); +#ifdef WITH_WSREP + wsrep_open(thd); + wsrep_before_command(thd); +#endif /* WITH_WSREP */ thd->clear_error(); // if errors from rollback /* acl_authenticate() takes the data from net->read_pos */ diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc index a6672e2be2c..4279cc25010 100644 --- a/sql/sql_plugin.cc +++ b/sql/sql_plugin.cc @@ -76,7 +76,7 @@ uint plugin_maturity_map[]= { 0, 1, 2, 3, 4, 5, 6 }; /* - When you ad a new plugin type, add both a string and make sure that the + When you add a new plugin type, add both a string and make sure that the init and deinit array are correctly updated. */ const LEX_CSTRING plugin_type_names[MYSQL_MAX_PLUGIN_TYPE_NUM]= @@ -227,6 +227,7 @@ static DYNAMIC_ARRAY plugin_array; static HASH plugin_hash[MYSQL_MAX_PLUGIN_TYPE_NUM]; static MEM_ROOT plugin_mem_root; static bool reap_needed= false; +volatile int global_plugin_version= 1; static bool initialized= 0; ulong dlopen_count; @@ -2225,6 +2226,7 @@ bool mysql_install_plugin(THD *thd, const LEX_CSTRING *name, reap_plugins(); } err: + global_plugin_version++; mysql_mutex_unlock(&LOCK_plugin); if (argv) free_defaults(argv); @@ -2375,6 +2377,7 @@ bool mysql_uninstall_plugin(THD *thd, const LEX_CSTRING *name, } reap_plugins(); + global_plugin_version++; mysql_mutex_unlock(&LOCK_plugin); DBUG_RETURN(error); #ifdef WITH_WSREP @@ -3693,7 +3696,7 @@ static int construct_options(MEM_ROOT *mem_root, struct st_plugin_int *tmp, const LEX_CSTRING plugin_dash = { STRING_WITH_LEN("plugin-") }; size_t plugin_name_len= strlen(plugin_name); size_t optnamelen; - const int max_comment_len= 180; + const int max_comment_len= 255; char *comment= (char *) alloc_root(mem_root, max_comment_len + 1); char *optname; @@ -3727,8 +3730,9 @@ static int construct_options(MEM_ROOT *mem_root, struct st_plugin_int *tmp, options[0].typelib= options[1].typelib= &global_plugin_typelib; strxnmov(comment, max_comment_len, "Enable or disable ", plugin_name, - " plugin. One of: ON, OFF, FORCE (don't start " - "if the plugin fails to load).", NullS); + " plugin. One of: ON, OFF, FORCE (don't start if the plugin" + " fails to load), FORCE_PLUS_PERMANENT (like FORCE, but the" + " plugin can not be uninstalled).", NullS); options[0].comment= comment; /* Allocate temporary space for the value of the tristate. diff --git a/sql/sql_plugin.h b/sql/sql_plugin.h index b60fe0f13eb..352d1e22ac4 100644 --- a/sql/sql_plugin.h +++ b/sql/sql_plugin.h @@ -38,6 +38,7 @@ enum enum_plugin_load_option { PLUGIN_OFF, PLUGIN_ON, PLUGIN_FORCE, PLUGIN_FORCE_PLUS_PERMANENT }; extern const char *global_plugin_typelib_names[]; +extern volatile int global_plugin_version; extern ulong dlopen_count; #include <my_sys.h> diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 8163f5b4cf0..b09f8256e9c 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -13157,7 +13157,10 @@ void JOIN_TAB::cleanup() if (table) { table->file->ha_end_keyread(); - table->file->ha_index_or_rnd_end(); + if (type == JT_FT) + table->file->ha_ft_end(); + else + table->file->ha_index_or_rnd_end(); preread_init_done= FALSE; if (table->pos_in_table_list && table->pos_in_table_list->jtbm_subselect) diff --git a/sql/sql_sequence.cc b/sql/sql_sequence.cc index 9f17590a315..4e8624d6360 100644 --- a/sql/sql_sequence.cc +++ b/sql/sql_sequence.cc @@ -176,7 +176,7 @@ void sequence_definition::store_fields(TABLE *table) /* - Check the sequence fields through seq_fields when createing a sequence. + Check the sequence fields through seq_fields when creating a sequence. RETURN VALUES false Success diff --git a/sql/sql_table.cc b/sql/sql_table.cc index e147c5247fb..7d73ed83beb 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -2300,7 +2300,7 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists, for (table= tables; table; table= table->next_local) { bool is_trans= 0; - bool table_creation_was_logged= 1; + bool table_creation_was_logged= 0; LEX_CSTRING db= table->db; handlerton *table_type= 0; @@ -6536,10 +6536,11 @@ static int compare_uint(const uint *s, const uint *t) return (*s < *t) ? -1 : ((*s > *t) ? 1 : 0); } -enum class Compare_keys +enum class Compare_keys : uint32_t { Equal, EqualButKeyPartLength, + EqualButComment, NotEqual }; @@ -6623,11 +6624,12 @@ Compare_keys compare_keys_but_name(const KEY *table_key, const KEY *new_key, return Compare_keys::NotEqual; /* Check that key comment is not changed. */ - if (table_key->comment.length != new_key->comment.length || - (table_key->comment.length && - memcmp(table_key->comment.str, new_key->comment.str, - table_key->comment.length) != 0)) - return Compare_keys::NotEqual; + if (cmp(table_key->comment, new_key->comment) != 0) + { + if (result != Compare_keys::Equal) + return Compare_keys::NotEqual; + result= Compare_keys::EqualButComment; + } return result; } @@ -7017,6 +7019,9 @@ static bool fill_alter_inplace_info(THD *thd, TABLE *table, bool varchar, case Compare_keys::EqualButKeyPartLength: ha_alter_info->handler_flags|= ALTER_COLUMN_INDEX_LENGTH; continue; + case Compare_keys::EqualButComment: + ha_alter_info->handler_flags|= ALTER_CHANGE_INDEX_COMMENT; + continue; case Compare_keys::NotEqual: break; } @@ -7678,6 +7683,7 @@ static bool mysql_inplace_alter_table(THD *thd, if (res) goto rollback; + DEBUG_SYNC(thd, "alter_table_inplace_before_lock_upgrade"); // Upgrade to EXCLUSIVE before commit. if (wait_while_table_is_used(thd, table, HA_EXTRA_PREPARE_FOR_RENAME)) goto rollback; diff --git a/sql/sql_type.cc b/sql/sql_type.cc index 462aebe0962..289a3d4d98d 100644 --- a/sql/sql_type.cc +++ b/sql/sql_type.cc @@ -1,5 +1,5 @@ /* - Copyright (c) 2015 MariaDB Foundation. + Copyright (c) 2015,2019 MariaDB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -1254,6 +1254,8 @@ Type_handler::string_type_handler(uint max_octet_length) return &type_handler_long_blob; else if (max_octet_length >= 65536) return &type_handler_medium_blob; + else if (max_octet_length >= MAX_FIELD_VARCHARLENGTH) + return &type_handler_blob; return &type_handler_varchar; } @@ -2347,6 +2349,7 @@ Field *Type_handler_varchar::make_conversion_table_field(MEM_ROOT *root, const Field *target) const { + DBUG_ASSERT(HA_VARCHAR_PACKLENGTH(metadata) <= MAX_FIELD_VARCHARLENGTH); return new(root) Field_varstring(NULL, metadata, HA_VARCHAR_PACKLENGTH(metadata), (uchar *) "", 1, Field::NONE, &empty_clex_str, @@ -3471,6 +3474,8 @@ Field *Type_handler_varchar::make_table_field(MEM_ROOT *root, TABLE *table) const { + DBUG_ASSERT(HA_VARCHAR_PACKLENGTH(attr.max_length) <= + MAX_FIELD_VARCHARLENGTH); return new (root) Field_varstring(addr.ptr(), attr.max_length, HA_VARCHAR_PACKLENGTH(attr.max_length), diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index f490760a2ad..d70a124aa23 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -1799,7 +1799,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize); %type <simple_string> remember_name remember_end - remember_tok_start remember_tok_end + remember_tok_start wild_and_where %type <const_simple_string> @@ -2174,9 +2174,12 @@ END_OF_INPUT %type <frame_exclusion> opt_window_frame_exclusion; %type <window_frame_bound> window_frame_start window_frame_bound; -%type <NONE> +%type <kwd> '-' '+' '*' '/' '%' '(' ')' - ',' '!' '{' '}' '&' '|' AND_SYM OR_SYM BETWEEN_SYM CASE_SYM + ',' '!' '{' '}' '&' '|' + +%type <NONE> + AND_SYM OR_SYM BETWEEN_SYM CASE_SYM THEN_SYM WHEN_SYM DIV_SYM MOD_SYM OR2_SYM AND_AND_SYM DELETE_SYM MYSQL_CONCAT_SYM ORACLE_CONCAT_SYM @@ -8829,6 +8832,7 @@ persistent_column_stat_spec: } table_column_list ')' + { } ; persistent_index_stat_spec: @@ -8842,6 +8846,7 @@ persistent_index_stat_spec: } table_index_list ')' + { } ; table_column_list: @@ -9608,12 +9613,6 @@ remember_tok_start: } ; -remember_tok_end: - { - $$= (char*) YYLIP->get_tok_end(); - } - ; - remember_name: { $$= (char*) YYLIP->get_cpp_tok_start(); @@ -12442,6 +12441,7 @@ window_spec: opt_window_ref opt_window_partition_clause opt_window_order_clause opt_window_frame_clause ')' + { } ; opt_window_ref: @@ -15238,16 +15238,16 @@ with_list_element: MYSQL_YYABORT; Lex->with_column_list.empty(); } - AS '(' remember_tok_start query_expression remember_tok_end ')' + AS '(' query_expression ')' { LEX *lex= thd->lex; const char *query_start= lex->sphead ? lex->sphead->m_tmp_query : thd->query(); - char *spec_start= $6 + 1; - With_element *elem= new With_element($1, *$2, $7); + const char *spec_start= $5.pos() + 1; + With_element *elem= new With_element($1, *$2, $6); if (elem == NULL || Lex->curr_with_clause->add_with_element(elem)) MYSQL_YYABORT; - if (elem->set_unparsed_spec(thd, spec_start, $8, + if (elem->set_unparsed_spec(thd, spec_start, $7.pos(), spec_start - query_start)) MYSQL_YYABORT; } @@ -16765,7 +16765,7 @@ table_lock: ? MDL_SHARED_WRITE : MDL_SHARED_NO_READ_WRITE; - if (unlikely(!Select-> + if (unlikely(!Lex->current_select_or_default()-> add_table_to_list(thd, $1, $2, table_options, lock_type, mdl_type))) MYSQL_YYABORT; @@ -17342,7 +17342,7 @@ opt_column_list: LEX *lex=Lex; lex->grant |= lex->which_columns; } - | '(' column_list ')' + | '(' column_list ')' { } ; column_list: @@ -17651,7 +17651,7 @@ view_suid: view_list_opt: /* empty */ {} - | '(' view_list ')' + | '(' view_list ')' { } ; view_list: diff --git a/sql/sql_yacc_ora.yy b/sql/sql_yacc_ora.yy index 52c223f891f..12dd6febb3a 100644 --- a/sql/sql_yacc_ora.yy +++ b/sql/sql_yacc_ora.yy @@ -1269,7 +1269,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize); %type <simple_string> remember_name remember_end remember_end_opt - remember_tok_start remember_tok_end + remember_tok_start wild_and_where %type <const_simple_string> @@ -1665,9 +1665,12 @@ END_OF_INPUT %type <frame_exclusion> opt_window_frame_exclusion; %type <window_frame_bound> window_frame_start window_frame_bound; -%type <NONE> +%type <kwd> '-' '+' '*' '/' '%' '(' ')' - ',' '!' '{' '}' '&' '|' AND_SYM OR_SYM BETWEEN_SYM CASE_SYM + ',' '!' '{' '}' '&' '|' + +%type <NONE> + AND_SYM OR_SYM BETWEEN_SYM CASE_SYM THEN_SYM WHEN_SYM DIV_SYM MOD_SYM OR2_SYM AND_AND_SYM DELETE_SYM MYSQL_CONCAT_SYM ORACLE_CONCAT_SYM @@ -8920,6 +8923,7 @@ persistent_column_stat_spec: } table_column_list ')' + { } ; persistent_index_stat_spec: @@ -8933,6 +8937,7 @@ persistent_index_stat_spec: } table_index_list ')' + { } ; table_column_list: @@ -9699,12 +9704,6 @@ remember_tok_start: } ; -remember_tok_end: - { - $$= (char*) YYLIP->get_tok_end(); - } - ; - remember_name: { $$= (char*) YYLIP->get_cpp_tok_start(); @@ -12542,6 +12541,7 @@ window_spec: opt_window_ref opt_window_partition_clause opt_window_order_clause opt_window_frame_clause ')' + { } ; opt_window_ref: @@ -15360,16 +15360,16 @@ with_list_element: MYSQL_YYABORT; Lex->with_column_list.empty(); } - AS '(' remember_tok_start query_expression remember_tok_end ')' + AS '(' query_expression ')' { LEX *lex= thd->lex; const char *query_start= lex->sphead ? lex->sphead->m_tmp_query : thd->query(); - char *spec_start= $6 + 1; - With_element *elem= new With_element($1, *$2, $7); + const char *spec_start= $5.pos() + 1; + With_element *elem= new With_element($1, *$2, $6); if (elem == NULL || Lex->curr_with_clause->add_with_element(elem)) MYSQL_YYABORT; - if (elem->set_unparsed_spec(thd, spec_start, $8, + if (elem->set_unparsed_spec(thd, spec_start, $7.pos(), spec_start - query_start)) MYSQL_YYABORT; } @@ -17562,7 +17562,7 @@ opt_column_list: LEX *lex=Lex; lex->grant |= lex->which_columns; } - | '(' column_list ')' + | '(' column_list ')' { } ; column_list: @@ -17872,7 +17872,7 @@ view_suid: view_list_opt: /* empty */ {} - | '(' view_list ')' + | '(' view_list ')' { } ; view_list: diff --git a/sql/table.cc b/sql/table.cc index ea333cb2ecd..e24404ab401 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -587,14 +587,14 @@ inline bool is_system_table_name(const char *name, size_t length) SYNOPSIS open_table_def() - thd Thread handler + thd Thread handler share Fill this with table definition - db_flags Bit mask of the following flags: OPEN_VIEW + flags Bit mask of the following flags: OPEN_VIEW NOTES This function is called when the table definition is not cached in table definition cache - The data is returned in 'share', which is alloced by + The data is returned in 'share', which is allocated by alloc_table_share().. The code assumes that share is initialized. */ diff --git a/sql/temporary_tables.cc b/sql/temporary_tables.cc index c42b6138c28..b39423e9131 100644 --- a/sql/temporary_tables.cc +++ b/sql/temporary_tables.cc @@ -1339,6 +1339,7 @@ bool THD::log_events_and_free_tmp_shares() my_thread_id save_pseudo_thread_id= variables.pseudo_thread_id; char db_buf[FN_REFLEN]; String db(db_buf, sizeof(db_buf), system_charset_info); + bool at_least_one_create_logged; /* Set pseudo_thread_id to be that of the processed table. @@ -1356,7 +1357,7 @@ bool THD::log_events_and_free_tmp_shares() within the sublist of common pseudo_thread_id to create single DROP query. */ - for (; + for (at_least_one_create_logged= false; share && IS_USER_TABLE(share) && tmpkeyval(share) == variables.pseudo_thread_id && share->db.length == db.length() && @@ -1364,51 +1365,58 @@ bool THD::log_events_and_free_tmp_shares() /* Get the next TABLE_SHARE in the list. */ share= temporary_tables->pop_front()) { - /* - We are going to add ` around the table names and possible more - due to special characters. - */ - append_identifier(this, &s_query, &share->table_name); - s_query.append(','); + if (share->table_creation_was_logged) + { + at_least_one_create_logged= true; + /* + We are going to add ` around the table names and possible more + due to special characters. + */ + append_identifier(this, &s_query, &share->table_name); + s_query.append(','); + } rm_temporary_table(share->db_type(), share->path.str); free_table_share(share); my_free(share); } - clear_error(); - CHARSET_INFO *cs_save= variables.character_set_client; - variables.character_set_client= system_charset_info; - thread_specific_used= true; - - Query_log_event qinfo(this, s_query.ptr(), - s_query.length() - 1 /* to remove trailing ',' */, - false, true, false, 0); - qinfo.db= db.ptr(); - qinfo.db_len= db.length(); - variables.character_set_client= cs_save; - - get_stmt_da()->set_overwrite_status(true); - transaction.stmt.mark_dropped_temp_table(); - bool error2= mysql_bin_log.write(&qinfo); - if (unlikely(error|= error2)) + if (at_least_one_create_logged) { - /* - If we're here following THD::cleanup, thence the connection - has been closed already. So lets print a message to the - error log instead of pushing yet another error into the - stmt_da. - - Also, we keep the error flag so that we propagate the error - up in the stack. This way, if we're the SQL thread we notice - that THD::close_tables failed. (Actually, the SQL - thread only calls THD::close_tables while applying - old Start_log_event_v3 events.) - */ - sql_print_error("Failed to write the DROP statement for " - "temporary tables to binary log"); - } + clear_error(); + CHARSET_INFO *cs_save= variables.character_set_client; + variables.character_set_client= system_charset_info; + thread_specific_used= true; + + Query_log_event qinfo(this, s_query.ptr(), + s_query.length() - 1 /* to remove trailing ',' */, + false, true, false, 0); + qinfo.db= db.ptr(); + qinfo.db_len= db.length(); + variables.character_set_client= cs_save; + + get_stmt_da()->set_overwrite_status(true); + transaction.stmt.mark_dropped_temp_table(); + bool error2= mysql_bin_log.write(&qinfo); + if (unlikely(error|= error2)) + { + /* + If we're here following THD::cleanup, thence the connection + has been closed already. So lets print a message to the + error log instead of pushing yet another error into the + stmt_da. + + Also, we keep the error flag so that we propagate the error + up in the stack. This way, if we're the SQL thread we notice + that THD::close_tables failed. (Actually, the SQL + thread only calls THD::close_tables while applying + old Start_log_event_v3 events.) + */ + sql_print_error("Failed to write the DROP statement for " + "temporary tables to binary log"); + } - get_stmt_da()->set_overwrite_status(false); + get_stmt_da()->set_overwrite_status(false); + } variables.pseudo_thread_id= save_pseudo_thread_id; thread_specific_used= save_thread_specific_used; } diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 13e3886fb97..ec889851b78 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -354,7 +354,8 @@ static int threadpool_process_request(THD *thd) { Vio *vio; thd->net.reading_or_writing= 0; - mysql_audit_release(thd); + if (mysql_audit_release_required(thd)) + mysql_audit_release(thd); if ((retval= do_command(thd)) != 0) goto end; diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index afb4ca3d3b7..68cf0d1877b 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -45,13 +45,13 @@ public: { m_thd->variables.option_bits&= ~OPTION_BEGIN; m_thd->server_status&= ~SERVER_STATUS_IN_TRANS; - m_thd->wsrep_cs().enter_toi(ws_meta); + m_thd->wsrep_cs().enter_toi_mode(ws_meta); } ~Wsrep_non_trans_mode() { m_thd->variables.option_bits= m_option_bits; m_thd->server_status= m_server_status; - m_thd->wsrep_cs().leave_toi(); + m_thd->wsrep_cs().leave_toi_mode(); } private: Wsrep_non_trans_mode(const Wsrep_non_trans_mode&); @@ -343,7 +343,8 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle, } int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) + const wsrep::const_buffer& data, + wsrep::mutable_buffer&) { DBUG_ENTER("Wsrep_high_priority_service::apply_toi"); THD* thd= m_thd; @@ -404,21 +405,33 @@ void Wsrep_high_priority_service::switch_execution_context(wsrep::high_priority_ } int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_handle, - const wsrep::ws_meta& ws_meta) + const wsrep::ws_meta& ws_meta, + wsrep::mutable_buffer& err) { DBUG_ENTER("Wsrep_high_priority_service::log_dummy_write_set"); int ret= 0; DBUG_PRINT("info", ("Wsrep_high_priority_service::log_dummy_write_set: seqno=%lld", ws_meta.seqno().get())); - m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta); - WSREP_DEBUG("Log dummy write set %lld", ws_meta.seqno().get()); - if (!(opt_log_slave_updates && wsrep_gtid_mode && m_thd->variables.gtid_seq_no)) + if (ws_meta.ordered()) { - m_thd->wsrep_cs().before_rollback(); - m_thd->wsrep_cs().after_rollback(); + wsrep::client_state& cs(m_thd->wsrep_cs()); + if (!cs.transaction().active()) + { + cs.start_transaction(ws_handle, ws_meta); + } + adopt_apply_error(err); + WSREP_DEBUG("Log dummy write set %lld", ws_meta.seqno().get()); + ret= cs.provider().commit_order_enter(ws_handle, ws_meta); + if (!(ret && opt_log_slave_updates && wsrep_gtid_mode && + m_thd->variables.gtid_seq_no)) + { + cs.before_rollback(); + cs.after_rollback(); + } + ret= ret || cs.provider().commit_order_leave(ws_handle, ws_meta, err); + cs.after_applying(); } - m_thd->wsrep_cs().after_applying(); DBUG_RETURN(ret); } @@ -452,7 +465,8 @@ Wsrep_applier_service::~Wsrep_applier_service() } int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) + const wsrep::const_buffer& data, + wsrep::mutable_buffer&) { DBUG_ENTER("Wsrep_applier_service::apply_write_set"); THD* thd= m_thd; @@ -606,7 +620,8 @@ Wsrep_replayer_service::~Wsrep_replayer_service() } int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, - const wsrep::const_buffer& data) + const wsrep::const_buffer& data, + wsrep::mutable_buffer&) { DBUG_ENTER("Wsrep_replayer_service::apply_write_set"); THD* thd= m_thd; diff --git a/sql/wsrep_high_priority_service.h b/sql/wsrep_high_priority_service.h index 34fa1669b71..c8c5eb87f44 100644 --- a/sql/wsrep_high_priority_service.h +++ b/sql/wsrep_high_priority_service.h @@ -37,19 +37,23 @@ public: const wsrep::ws_meta&); const wsrep::transaction& transaction() const; int adopt_transaction(const wsrep::transaction&); - int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) = 0; + int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&, + wsrep::mutable_buffer&) = 0; int append_fragment_and_commit(const wsrep::ws_handle&, const wsrep::ws_meta&, const wsrep::const_buffer&); int remove_fragments(const wsrep::ws_meta&); int commit(const wsrep::ws_handle&, const wsrep::ws_meta&); int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&); - int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&); + int apply_toi(const wsrep::ws_meta&, const wsrep::const_buffer&, + wsrep::mutable_buffer&); void store_globals(); void reset_globals(); void switch_execution_context(wsrep::high_priority_service&); int log_dummy_write_set(const wsrep::ws_handle&, - const wsrep::ws_meta&); + const wsrep::ws_meta&, + wsrep::mutable_buffer&); + void adopt_apply_error(wsrep::mutable_buffer& err) {} virtual bool check_exit_status() const = 0; void debug_crash(const char*); @@ -78,7 +82,8 @@ class Wsrep_applier_service : public Wsrep_high_priority_service public: Wsrep_applier_service(THD*); ~Wsrep_applier_service(); - int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&); + int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&, + wsrep::mutable_buffer&); void after_apply(); bool is_replaying() const { return false; } bool check_exit_status() const; @@ -89,7 +94,8 @@ class Wsrep_replayer_service : public Wsrep_high_priority_service public: Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd); ~Wsrep_replayer_service(); - int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&); + int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&, + wsrep::mutable_buffer&); void after_apply() { } bool is_replaying() const { return true; } void replay_status(enum wsrep::provider::status status) diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 4d424d3fd35..6ea9eab96aa 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -47,6 +47,7 @@ #include <string> #include "log_event.h" #include <slave.h> +#include "sql_plugin.h" /* wsrep_plugins_pre_init() */ #include <sstream> @@ -153,7 +154,11 @@ mysql_mutex_t LOCK_wsrep_SR_pool; mysql_mutex_t LOCK_wsrep_SR_store; int wsrep_replaying= 0; -ulong wsrep_running_threads= 0; // # of currently running wsrep threads +ulong wsrep_running_threads = 0; // # of currently running wsrep + // # threads +ulong wsrep_running_applier_threads = 0; // # of running applier threads +ulong wsrep_running_rollbacker_threads = 0; // # of running + // # rollbacker threads ulong my_bind_addr; #ifdef HAVE_PSI_INTERFACE @@ -1782,7 +1787,7 @@ static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */) if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; } wsrep::client_state& cs(thd->wsrep_cs()); - int const ret= cs.leave_toi(); + int const ret= cs.leave_toi_local(wsrep::mutable_buffer()); if (ret) { WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, " @@ -1850,10 +1855,10 @@ static int wsrep_TOI_begin(THD *thd, const char *db, const char *table, thd_proc_info(thd, "acquiring total order isolation"); wsrep::client_state& cs(thd->wsrep_cs()); - int ret= cs.enter_toi(key_array, - wsrep::const_buffer(buff.ptr, buff.len), - wsrep::provider::flag::start_transaction | - wsrep::provider::flag::commit); + int ret= cs.enter_toi_local(key_array, + wsrep::const_buffer(buff.ptr, buff.len), + wsrep::provider::flag::start_transaction | + wsrep::provider::flag::commit); if (ret) { @@ -1909,7 +1914,7 @@ static void wsrep_TOI_end(THD *thd) { if (wsrep_thd_is_local_toi(thd)) { wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); - int ret= client_state.leave_toi(); + int ret= client_state.leave_toi_local(wsrep::mutable_buffer()); if (!ret) { WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get()); @@ -2400,8 +2405,7 @@ int wsrep_must_ignore_error(THD* thd) const uint flags= sql_command_flags[thd->lex->sql_command]; DBUG_ASSERT(error); - DBUG_ASSERT((wsrep_thd_is_toi(thd)) || - (wsrep_thd_is_applying(thd) && thd->wsrep_apply_toi)); + DBUG_ASSERT(wsrep_thd_is_toi(thd) || wsrep_thd_is_applying(thd)); if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL)) goto ignore_error; @@ -2652,7 +2656,21 @@ void* start_wsrep_THD(void *arg) thd->set_command(COM_SLEEP); thd->init_for_queries(); mysql_mutex_lock(&LOCK_wsrep_slave_threads); + wsrep_running_threads++; + + switch (thd_args->thread_type()) { + case WSREP_APPLIER_THREAD: + wsrep_running_applier_threads++; + break; + case WSREP_ROLLBACKER_THREAD: + wsrep_running_rollbacker_threads++; + break; + default: + WSREP_ERROR("Incorrect wsrep thread type: %d", thd_args->thread_type()); + break; + } + mysql_cond_broadcast(&COND_wsrep_slave_threads); mysql_mutex_unlock(&LOCK_wsrep_slave_threads); @@ -2672,7 +2690,23 @@ void* start_wsrep_THD(void *arg) delete thd_args; mysql_mutex_lock(&LOCK_wsrep_slave_threads); + DBUG_ASSERT(wsrep_running_threads > 0); wsrep_running_threads--; + + switch (thd_args->thread_type()) { + case WSREP_APPLIER_THREAD: + DBUG_ASSERT(wsrep_running_applier_threads > 0); + wsrep_running_applier_threads--; + break; + case WSREP_ROLLBACKER_THREAD: + DBUG_ASSERT(wsrep_running_rollbacker_threads > 0); + wsrep_running_rollbacker_threads--; + break; + default: + WSREP_ERROR("Incorrect wsrep thread type: %d", thd_args->thread_type()); + break; + } + WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads); mysql_cond_broadcast(&COND_wsrep_slave_threads); mysql_mutex_unlock(&LOCK_wsrep_slave_threads); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 37301afa7be..844437bab95 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -93,6 +93,8 @@ extern ulong wsrep_trx_fragment_unit; extern ulong wsrep_SR_store_type; extern uint wsrep_ignore_apply_errors; extern ulong wsrep_running_threads; +extern ulong wsrep_running_applier_threads; +extern ulong wsrep_running_rollbacker_threads; extern bool wsrep_new_cluster; extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; @@ -393,27 +395,37 @@ void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end); void thd_binlog_rollback_stmt(THD * thd); void thd_binlog_trx_reset(THD * thd); +enum wsrep_thread_type { + WSREP_APPLIER_THREAD=1, + WSREP_ROLLBACKER_THREAD=2 +}; + typedef void (*wsrep_thd_processor_fun)(THD*, void *); class Wsrep_thd_args { public: - Wsrep_thd_args(wsrep_thd_processor_fun fun, void* args) + Wsrep_thd_args(wsrep_thd_processor_fun fun, void* args, + wsrep_thread_type thread_type) : fun_ (fun), - args_(args) + args_ (args), + thread_type_ (thread_type) { } wsrep_thd_processor_fun fun() { return fun_; } void* args() { return args_; } + enum wsrep_thread_type thread_type() {return thread_type_;} + private: Wsrep_thd_args(const Wsrep_thd_args&); Wsrep_thd_args& operator=(const Wsrep_thd_args&); wsrep_thd_processor_fun fun_; - void* args_; + void* args_; + enum wsrep_thread_type thread_type_; }; void* start_wsrep_THD(void*); diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index 82c085a61d2..c1b955e4483 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1343,7 +1343,8 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ws_meta); } applier->store_globals(); - applier->apply_write_set(ws_meta, data); + wsrep::mutable_buffer unused; + applier->apply_write_set(ws_meta, data, unused); applier->after_apply(); storage_service.store_globals(); } diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 6173506385d..c62132b16a2 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -123,7 +123,8 @@ void wsrep_create_appliers(long threads) while (wsrep_threads++ < threads) { - Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_replication_process, 0)); + Wsrep_thd_args* args(new Wsrep_thd_args(wsrep_replication_process, 0, + WSREP_APPLIER_THREAD)); if (create_wsrep_THD(args)) { WSREP_WARN("Can't create thread to manage wsrep replication"); @@ -312,14 +313,16 @@ void wsrep_create_rollbacker() { if (wsrep_cluster_address && wsrep_cluster_address[0] != 0) { - Wsrep_thd_args* args= new Wsrep_thd_args(wsrep_rollback_process, 0); + Wsrep_thd_args* args= new Wsrep_thd_args(wsrep_rollback_process, 0, + WSREP_ROLLBACKER_THREAD); /* create rollbacker */ if (create_wsrep_THD(args)) WSREP_WARN("Can't create thread to manage wsrep rollback"); /* create post_rollbacker */ - args= new Wsrep_thd_args(wsrep_post_rollback_process, 0); + args= new Wsrep_thd_args(wsrep_post_rollback_process, 0, + WSREP_ROLLBACKER_THREAD); if (create_wsrep_THD(args)) WSREP_WARN("Can't create thread to manage wsrep post rollback"); } diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index de558af6abb..315cfe2c53e 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -485,6 +485,8 @@ bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type) if (wsrep_start_replication()) { wsrep_create_rollbacker(); + WSREP_DEBUG("Cluster address update creating %ld applier threads running %lu", + wsrep_slave_threads, wsrep_running_applier_threads); wsrep_create_appliers(wsrep_slave_threads); } /* locking order to be enforced is: @@ -585,9 +587,9 @@ void wsrep_node_address_init (const char* value) static void wsrep_slave_count_change_update () { - wsrep_slave_count_change= (wsrep_slave_threads - wsrep_running_threads + 2); - WSREP_DEBUG("Change on slave threads: New %lu old %lu difference %d", - wsrep_slave_threads, wsrep_running_threads, wsrep_slave_count_change); + wsrep_slave_count_change = (wsrep_slave_threads - wsrep_running_applier_threads); + WSREP_DEBUG("Change on slave threads: New %ld old %lu difference %d", + wsrep_slave_threads, wsrep_running_applier_threads, wsrep_slave_count_change); } bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) @@ -595,8 +597,10 @@ bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) wsrep_slave_count_change_update(); if (wsrep_slave_count_change > 0) { + WSREP_DEBUG("Creating %d applier threads, total %ld", wsrep_slave_count_change, wsrep_slave_threads); wsrep_create_appliers(wsrep_slave_count_change); - wsrep_slave_count_change= 0; + WSREP_DEBUG("Running %lu applier threads", wsrep_running_applier_threads); + wsrep_slave_count_change = 0; } return false; } @@ -758,8 +762,10 @@ static SHOW_VAR wsrep_status_vars[]= {"provider_name", (char*) &wsrep_provider_name, SHOW_CHAR_PTR}, {"provider_version", (char*) &wsrep_provider_version, SHOW_CHAR_PTR}, {"provider_vendor", (char*) &wsrep_provider_vendor, SHOW_CHAR_PTR}, - {"wsrep_provider_capabilities", (char*) &wsrep_provider_capabilities, SHOW_CHAR_PTR}, - {"thread_count", (char*) &wsrep_running_threads, SHOW_LONG_NOFLUSH} + {"provider_capabilities", (char*) &wsrep_provider_capabilities, SHOW_CHAR_PTR}, + {"thread_count", (char*) &wsrep_running_threads, SHOW_LONG_NOFLUSH}, + {"applier_thread_count", (char*)&wsrep_running_applier_threads, SHOW_LONG_NOFLUSH}, + {"rollbacker_thread_count", (char *)&wsrep_running_rollbacker_threads, SHOW_LONG_NOFLUSH}, }; static int show_var_cmp(const void *var1, const void *var2) |