diff options
author | Igor Babaev <igor@askmonty.org> | 2013-04-15 09:16:54 -0700 |
---|---|---|
committer | Igor Babaev <igor@askmonty.org> | 2013-04-15 09:16:54 -0700 |
commit | f4cd2b37b12617ca383e625dfd07ce1468c5b5a8 (patch) | |
tree | 4720b59562af2f8a4208b3cce4dca649d05b029b /sql | |
parent | ab10dc8fafc3ff5448947e698b500bab4042e95e (diff) | |
parent | 30b2c64c4e87f8924fe817f5c607ae90a990e9e5 (diff) | |
download | mariadb-git-f4cd2b37b12617ca383e625dfd07ce1468c5b5a8.tar.gz |
Merge 10.0-base -> mwl253
Diffstat (limited to 'sql')
57 files changed, 5307 insertions, 653 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 5d61df2fa9b..63e2df46bed 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -88,6 +88,7 @@ SET (SQL_SOURCE threadpool_common.cc ../sql-common/mysql_async.c my_apc.cc my_apc.h + rpl_gtid.cc ${GEN_SOURCES} ${MYSYS_LIBWRAP_SOURCE} ) diff --git a/sql/event_parse_data.cc b/sql/event_parse_data.cc index ad812a6aa5d..4316a9f1fb8 100644 --- a/sql/event_parse_data.cc +++ b/sql/event_parse_data.cc @@ -574,8 +574,8 @@ void Event_parse_data::check_originator_id(THD *thd) status= Event_parse_data::SLAVESIDE_DISABLED; status_changed= true; } - originator = thd->server_id; + originator = thd->variables.server_id; } else - originator = server_id; + originator = global_system_variables.server_id; } diff --git a/sql/field.cc b/sql/field.cc index 2dc7bbba07a..103d868d834 100644 --- a/sql/field.cc +++ b/sql/field.cc @@ -8956,6 +8956,7 @@ void Create_field::init_for_tmp_table(enum_field_types sql_type_arg, FLAGSTR(pack_flag, FIELDFLAG_DECIMAL), f_packtype(pack_flag))); vcol_info= 0; + create_if_not_exists= FALSE; stored_in_db= TRUE; DBUG_VOID_RETURN; @@ -8993,7 +8994,7 @@ bool Create_field::init(THD *thd, char *fld_name, enum_field_types fld_type, char *fld_change, List<String> *fld_interval_list, CHARSET_INFO *fld_charset, uint fld_geom_type, Virtual_column_info *fld_vcol_info, - engine_option_value *create_opt) + engine_option_value *create_opt, bool check_exists) { uint sign_len, allowed_type_modifier= 0; ulong max_field_charlength= MAX_FIELD_CHARLENGTH; @@ -9047,6 +9048,7 @@ bool Create_field::init(THD *thd, char *fld_name, enum_field_types fld_type, comment= *fld_comment; vcol_info= fld_vcol_info; + create_if_not_exists= check_exists; stored_in_db= TRUE; /* Initialize data for a computed field */ @@ -9655,6 +9657,7 @@ Create_field::Create_field(Field *old_field,Field *orig_field) comment= old_field->comment; decimals= old_field->decimals(); vcol_info= old_field->vcol_info; + create_if_not_exists= FALSE; stored_in_db= old_field->stored_in_db; option_list= old_field->option_list; option_struct= old_field->option_struct; diff --git a/sql/field.h b/sql/field.h index 38f36fa2677..550ae9d65b9 100644 --- a/sql/field.h +++ b/sql/field.h @@ -2419,8 +2419,9 @@ public: uint8 interval_id; // For rea_create_table uint offset,pack_flag; + bool create_if_not_exists; // Used in ALTER TABLE IF NOT EXISTS - /* + /* This is additinal data provided for any computed(virtual) field. In particular it includes a pointer to the item by which this field can be computed from other fields. @@ -2433,7 +2434,8 @@ public: */ bool stored_in_db; - Create_field() :after(0), option_list(NULL), option_struct(NULL) + Create_field() :after(0), option_list(NULL), option_struct(NULL), + create_if_not_exists(FALSE) {} Create_field(Field *field, Field *orig_field); /* Used to make a clone of this object for ALTER/CREATE TABLE */ @@ -2451,7 +2453,7 @@ public: Item *on_update_value, LEX_STRING *comment, char *change, List<String> *interval_list, CHARSET_INFO *cs, uint uint_geom_type, Virtual_column_info *vcol_info, - engine_option_value *option_list); + engine_option_value *option_list, bool check_exists); bool field_flags_are_binary() { diff --git a/sql/item.cc b/sql/item.cc index fd6a11c319b..90a652e92c7 100644 --- a/sql/item.cc +++ b/sql/item.cc @@ -198,7 +198,7 @@ Hybrid_type_traits_integer::fix_length_and_dec(Item *item, Item *arg) const void item_init(void) { - item_user_lock_init(); + item_func_sleep_init(); uuid_short_init(); } diff --git a/sql/item_create.cc b/sql/item_create.cc index ba6eb7ff603..c1cefed6f8b 100644 --- a/sql/item_create.cc +++ b/sql/item_create.cc @@ -447,6 +447,19 @@ protected: }; +class Create_func_binlog_gtid_pos : public Create_func_arg2 +{ +public: + virtual Item *create_2_arg(THD *thd, Item *arg1, Item *arg2); + + static Create_func_binlog_gtid_pos s_singleton; + +protected: + Create_func_binlog_gtid_pos() {} + virtual ~Create_func_binlog_gtid_pos() {} +}; + + class Create_func_bit_count : public Create_func_arg1 { public: @@ -3100,6 +3113,16 @@ Create_func_bin::create_1_arg(THD *thd, Item *arg1) } +Create_func_binlog_gtid_pos Create_func_binlog_gtid_pos::s_singleton; + +Item* +Create_func_binlog_gtid_pos::create_2_arg(THD *thd, Item *arg1, Item *arg2) +{ + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); + return new (thd->mem_root) Item_func_binlog_gtid_pos(arg1, arg2); +} + + Create_func_bit_count Create_func_bit_count::s_singleton; Item* @@ -5322,6 +5345,7 @@ static Native_func_registry func_array[] = { { C_STRING_WITH_LEN("ATAN2") }, BUILDER(Create_func_atan)}, { { C_STRING_WITH_LEN("BENCHMARK") }, BUILDER(Create_func_benchmark)}, { { C_STRING_WITH_LEN("BIN") }, BUILDER(Create_func_bin)}, + { { C_STRING_WITH_LEN("BINLOG_GTID_POS") }, BUILDER(Create_func_binlog_gtid_pos)}, { { C_STRING_WITH_LEN("BIT_COUNT") }, BUILDER(Create_func_bit_count)}, { { C_STRING_WITH_LEN("BIT_LENGTH") }, BUILDER(Create_func_bit_length)}, { { C_STRING_WITH_LEN("BUFFER") }, GEOM_BUILDER(Create_func_buffer)}, diff --git a/sql/item_func.cc b/sql/item_func.cc index 5316f89e730..aa574eabf8c 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -3750,120 +3750,6 @@ udf_handler::~udf_handler() bool udf_handler::get_arguments() { return 0; } #endif /* HAVE_DLOPEN */ -/* -** User level locks -*/ - -mysql_mutex_t LOCK_user_locks; -static HASH hash_user_locks; - -class User_level_lock -{ - uchar *key; - size_t key_length; - -public: - int count; - bool locked; - mysql_cond_t cond; - my_thread_id thread_id; - void set_thread(THD *thd) { thread_id= thd->thread_id; } - - User_level_lock(const uchar *key_arg,uint length, ulong id) - :key_length(length),count(1),locked(1), thread_id(id) - { - key= (uchar*) my_memdup(key_arg,length,MYF(0)); - mysql_cond_init(key_user_level_lock_cond, &cond, NULL); - if (key) - { - if (my_hash_insert(&hash_user_locks,(uchar*) this)) - { - my_free(key); - key=0; - } - } - } - ~User_level_lock() - { - if (key) - { - my_hash_delete(&hash_user_locks,(uchar*) this); - my_free(key); - } - mysql_cond_destroy(&cond); - } - inline bool initialized() { return key != 0; } - friend void item_user_lock_release(User_level_lock *ull); - friend uchar *ull_get_key(const User_level_lock *ull, size_t *length, - my_bool not_used); -}; - -uchar *ull_get_key(const User_level_lock *ull, size_t *length, - my_bool not_used __attribute__((unused))) -{ - *length= ull->key_length; - return ull->key; -} - -#ifdef HAVE_PSI_INTERFACE -static PSI_mutex_key key_LOCK_user_locks; - -static PSI_mutex_info all_user_mutexes[]= -{ - { &key_LOCK_user_locks, "LOCK_user_locks", PSI_FLAG_GLOBAL} -}; - -static void init_user_lock_psi_keys(void) -{ - const char* category= "sql"; - int count; - - if (PSI_server == NULL) - return; - - count= array_elements(all_user_mutexes); - PSI_server->register_mutex(category, all_user_mutexes, count); -} -#endif - -static bool item_user_lock_inited= 0; - -void item_user_lock_init(void) -{ -#ifdef HAVE_PSI_INTERFACE - init_user_lock_psi_keys(); -#endif - - mysql_mutex_init(key_LOCK_user_locks, &LOCK_user_locks, MY_MUTEX_INIT_SLOW); - my_hash_init(&hash_user_locks,system_charset_info, - 16,0,0,(my_hash_get_key) ull_get_key,NULL,0); - item_user_lock_inited= 1; -} - -void item_user_lock_free(void) -{ - if (item_user_lock_inited) - { - item_user_lock_inited= 0; - my_hash_free(&hash_user_locks); - mysql_mutex_destroy(&LOCK_user_locks); - } -} - -void item_user_lock_release(User_level_lock *ull) -{ - ull->locked=0; - ull->thread_id= 0; - if (--ull->count) - mysql_cond_signal(&ull->cond); - else - delete ull; -} - -/** - Wait until we are at or past the given position in the master binlog - on the slave. -*/ longlong Item_master_pos_wait::val_int() { @@ -4010,7 +3896,136 @@ int Interruptible_wait::wait(mysql_cond_t *cond, mysql_mutex_t *mutex) /** - Get a user level lock. If the thread has an old lock this is first released. + For locks with EXPLICIT duration, MDL returns a new ticket + every time a lock is granted. This allows to implement recursive + locks without extra allocation or additional data structures, such + as below. However, if there are too many tickets in the same + MDL_context, MDL_context::find_ticket() is getting too slow, + since it's using a linear search. + This is why a separate structure is allocated for a user + level lock, and before requesting a new lock from MDL, + GET_LOCK() checks thd->ull_hash if such lock is already granted, + and if so, simply increments a reference counter. +*/ + +class User_level_lock +{ +public: + MDL_ticket *lock; + int refs; +}; + + +/** Extract a hash key from User_level_lock. */ + +uchar *ull_get_key(const uchar *ptr, size_t *length, + my_bool not_used __attribute__((unused))) +{ + User_level_lock *ull = (User_level_lock*) ptr; + MDL_key *key = ull->lock->get_key(); + *length= key->length(); + return (uchar*) key->ptr(); +} + + +/** + Release all user level locks for this THD. +*/ + +void mysql_ull_cleanup(THD *thd) +{ + User_level_lock *ull; + DBUG_ENTER("mysql_ull_cleanup"); + + for (uint i= 0; i < thd->ull_hash.records; i++) + { + ull = (User_level_lock*) my_hash_element(&thd->ull_hash, i); + thd->mdl_context.release_lock(ull->lock); + my_free(ull); + } + + my_hash_free(&thd->ull_hash); + + DBUG_VOID_RETURN; +} + + +/** + Set explicit duration for metadata locks corresponding to + user level locks to protect them from being released at the end + of transaction. +*/ + +void mysql_ull_set_explicit_lock_duration(THD *thd) +{ + User_level_lock *ull; + DBUG_ENTER("mysql_ull_set_explicit_lock_duration"); + + for (uint i= 0; i < thd->ull_hash.records; i++) + { + ull= (User_level_lock*) my_hash_element(&thd->ull_hash, i); + thd->mdl_context.set_lock_duration(ull->lock, MDL_EXPLICIT); + } + DBUG_VOID_RETURN; +} + + +/** + When MDL detects a lock wait timeout, it pushes + an error into the statement diagnostics area. + For GET_LOCK(), lock wait timeout is not an error, + but a special return value (0). NULL is returned in + case of error. + Capture and suppress lock wait timeout. +*/ + +class Lock_wait_timeout_handler: public Internal_error_handler +{ +public: + Lock_wait_timeout_handler() :m_lock_wait_timeout(false) {} + + bool m_lock_wait_timeout; + + bool handle_condition(THD * /* thd */, uint sql_errno, + const char * /* sqlstate */, + MYSQL_ERROR::enum_warning_level /* level */, + const char *message, + MYSQL_ERROR ** /* cond_hdl */); +}; + +bool +Lock_wait_timeout_handler:: +handle_condition(THD * /* thd */, uint sql_errno, + const char * /* sqlstate */, + MYSQL_ERROR::enum_warning_level /* level */, + const char *message, + MYSQL_ERROR ** /* cond_hdl */) +{ + if (sql_errno == ER_LOCK_WAIT_TIMEOUT) + { + m_lock_wait_timeout= true; + return true; /* condition handled */ + } + return false; +} + + +static int ull_name_ok(String *name) +{ + if (!name || !name->length()) + return 0; + + if (name->length() > NAME_LEN) + { + my_error(ER_TOO_LONG_IDENT, MYF(0), name->c_ptr_safe()); + return 0; + } + return 1; +} + + +/** + Get a user level lock. @retval 1 : Got lock @@ -4023,14 +4038,13 @@ int Interruptible_wait::wait(mysql_cond_t *cond, mysql_mutex_t *mutex) longlong Item_func_get_lock::val_int() { DBUG_ASSERT(fixed == 1); - String *res=args[0]->val_str(&value); + String *res= args[0]->val_str(&value); ulonglong timeout= args[1]->val_int(); - THD *thd=current_thd; + THD *thd= current_thd; User_level_lock *ull; - int error; - Interruptible_wait timed_cond(thd); DBUG_ENTER("Item_func_get_lock::val_int"); + null_value= 1; /* In slave thread no need to get locks, everything is serialized. Anyway there is no way to make GET_LOCK() work on slave like it did on master @@ -4039,104 +4053,70 @@ longlong Item_func_get_lock::val_int() it's not guaranteed to be same as on master. */ if (thd->slave_thread) + { + null_value= 0; DBUG_RETURN(1); + } - mysql_mutex_lock(&LOCK_user_locks); - - if (!res || !res->length()) - { - mysql_mutex_unlock(&LOCK_user_locks); - null_value=1; + if (!ull_name_ok(res)) DBUG_RETURN(0); - } + DBUG_PRINT("info", ("lock %.*s, thd=%ld", res->length(), res->ptr(), (long) thd->real_id)); - null_value=0; - - if (thd->ull) + /* HASH entries are of type User_level_lock. */ + if (! my_hash_inited(&thd->ull_hash) && + my_hash_init(&thd->ull_hash, &my_charset_bin, + 16 /* small hash */, 0, 0, ull_get_key, NULL, 0)) { - item_user_lock_release(thd->ull); - thd->ull=0; + DBUG_RETURN(0); } - if (!(ull= ((User_level_lock *) my_hash_search(&hash_user_locks, - (uchar*) res->ptr(), - (size_t) res->length())))) - { - ull= new User_level_lock((uchar*) res->ptr(), (size_t) res->length(), - thd->thread_id); - if (!ull || !ull->initialized()) - { - delete ull; - mysql_mutex_unlock(&LOCK_user_locks); - null_value=1; // Probably out of memory - DBUG_RETURN(0); - } - ull->set_thread(thd); - thd->ull=ull; - mysql_mutex_unlock(&LOCK_user_locks); - DBUG_PRINT("info", ("made new lock")); - DBUG_RETURN(1); // Got new lock - } - ull->count++; - DBUG_PRINT("info", ("ull->count=%d", ull->count)); + MDL_request ull_request; + ull_request.init(MDL_key::USER_LOCK, res->c_ptr_safe(), "", + MDL_SHARED_NO_WRITE, MDL_EXPLICIT); + MDL_key *ull_key = &ull_request.key; - /* - Structure is now initialized. Try to get the lock. - Set up control struct to allow others to abort locks. - */ - thd_proc_info(thd, "User lock"); - thd->mysys_var->current_mutex= &LOCK_user_locks; - thd->mysys_var->current_cond= &ull->cond; - timed_cond.set_timeout(timeout * 1000000000ULL); - - error= 0; - thd_wait_begin(thd, THD_WAIT_USER_LOCK); - while (ull->locked && !thd->killed) + if ((ull= (User_level_lock*) + my_hash_search(&thd->ull_hash, ull_key->ptr(), ull_key->length()))) { - DBUG_PRINT("info", ("waiting on lock")); - error= timed_cond.wait(&ull->cond, &LOCK_user_locks); - if (error == ETIMEDOUT || error == ETIME) - { - DBUG_PRINT("info", ("lock wait timeout")); - break; - } - error= 0; + /* Recursive lock */ + ull->refs++; + null_value = 0; + DBUG_RETURN(1); } - thd_wait_end(thd); - if (ull->locked) + Lock_wait_timeout_handler lock_wait_timeout_handler; + thd->push_internal_handler(&lock_wait_timeout_handler); + bool error= thd->mdl_context.acquire_lock(&ull_request, timeout); + (void) thd->pop_internal_handler(); + if (error) { - if (!--ull->count) - { - DBUG_ASSERT(0); - delete ull; // Should never happen - } - if (!error) // Killed (thd->killed != 0) - { - error=1; - null_value=1; // Return NULL - } + if (lock_wait_timeout_handler.m_lock_wait_timeout) + null_value= 0; + DBUG_RETURN(0); } - else // We got the lock + + ull= (User_level_lock*) my_malloc(sizeof(User_level_lock), + MYF(MY_WME|MY_THREAD_SPECIFIC)); + if (ull == NULL) { - ull->locked=1; - ull->set_thread(thd); - ull->thread_id= thd->thread_id; - thd->ull=ull; - error=0; - DBUG_PRINT("info", ("got the lock")); + thd->mdl_context.release_lock(ull_request.ticket); + DBUG_RETURN(0); } - mysql_mutex_unlock(&LOCK_user_locks); - mysql_mutex_lock(&thd->mysys_var->mutex); - thd_proc_info(thd, 0); - thd->mysys_var->current_mutex= 0; - thd->mysys_var->current_cond= 0; - mysql_mutex_unlock(&thd->mysys_var->mutex); + ull->lock= ull_request.ticket; + ull->refs= 1; + + if (my_hash_insert(&thd->ull_hash, (uchar*) ull)) + { + thd->mdl_context.release_lock(ull->lock); + my_free(ull); + DBUG_RETURN(0); + } + null_value= 0; - DBUG_RETURN(!error ? 1 : 0); + DBUG_RETURN(1); } @@ -4151,43 +4131,86 @@ longlong Item_func_get_lock::val_int() longlong Item_func_release_lock::val_int() { DBUG_ASSERT(fixed == 1); - String *res=args[0]->val_str(&value); - User_level_lock *ull; - longlong result; - THD *thd=current_thd; + String *res= args[0]->val_str(&value); + THD *thd= current_thd; DBUG_ENTER("Item_func_release_lock::val_int"); - if (!res || !res->length()) - { - null_value=1; + null_value= 1; + + if (!ull_name_ok(res)) DBUG_RETURN(0); - } + DBUG_PRINT("info", ("lock %.*s", res->length(), res->ptr())); - null_value=0; - result=0; - mysql_mutex_lock(&LOCK_user_locks); - if (!(ull= ((User_level_lock*) my_hash_search(&hash_user_locks, - (const uchar*) res->ptr(), - (size_t) res->length())))) + MDL_key ull_key; + ull_key.mdl_key_init(MDL_key::USER_LOCK, res->c_ptr_safe(), ""); + + User_level_lock *ull; + + if (!(ull= + (User_level_lock*) my_hash_search(&thd->ull_hash, + ull_key.ptr(), ull_key.length()))) { - null_value=1; + null_value= thd->mdl_context.get_lock_owner(&ull_key) == 0; + DBUG_RETURN(0); } - else + null_value= 0; + if (--ull->refs == 0) { - DBUG_PRINT("info", ("ull->locked=%d ull->thread=%lu thd=%lu", - (int) ull->locked, - (long)ull->thread_id, - (long)thd->thread_id)); - if (ull->locked && current_thd->thread_id == ull->thread_id) - { - DBUG_PRINT("info", ("release lock")); - result=1; // Release is ok - item_user_lock_release(ull); - thd->ull=0; - } + my_hash_delete(&thd->ull_hash, (uchar*) ull); + thd->mdl_context.release_lock(ull->lock); + my_free(ull); } - mysql_mutex_unlock(&LOCK_user_locks); - DBUG_RETURN(result); + DBUG_RETURN(1); +} + + +/** + Check a user level lock. + + Sets null_value=TRUE on error. + + @retval + 1 Available + @retval + 0 Already taken, or error +*/ + +longlong Item_func_is_free_lock::val_int() +{ + DBUG_ASSERT(fixed == 1); + String *res= args[0]->val_str(&value); + THD *thd= current_thd; + null_value= 1; + + if (!ull_name_ok(res)) + return 0; + + MDL_key ull_key; + ull_key.mdl_key_init(MDL_key::USER_LOCK, res->c_ptr_safe(), ""); + + null_value= 0; + return thd->mdl_context.get_lock_owner(&ull_key) == 0; +} + + +longlong Item_func_is_used_lock::val_int() +{ + DBUG_ASSERT(fixed == 1); + String *res= args[0]->val_str(&value); + THD *thd= current_thd; + null_value= 1; + + if (!ull_name_ok(res)) + return 0; + + MDL_key ull_key; + ull_key.mdl_key_init(MDL_key::USER_LOCK, res->c_ptr_safe(), ""); + ulong thread_id = thd->mdl_context.get_lock_owner(&ull_key); + if (thread_id == 0) + return 0; + + null_value= 0; + return thread_id; } @@ -4288,6 +4311,54 @@ void Item_func_benchmark::print(String *str, enum_query_type query_type) } +mysql_mutex_t LOCK_item_func_sleep; + +#ifdef HAVE_PSI_INTERFACE +static PSI_mutex_key key_LOCK_item_func_sleep; + +static PSI_mutex_info item_func_sleep_mutexes[]= +{ + { &key_LOCK_item_func_sleep, "LOCK_user_locks", PSI_FLAG_GLOBAL} +}; + + +static void init_item_func_sleep_psi_keys(void) +{ + const char* category= "sql"; + int count; + + if (PSI_server == NULL) + return; + + count= array_elements(item_func_sleep_mutexes); + PSI_server->register_mutex(category, item_func_sleep_mutexes, count); +} +#endif + +static bool item_func_sleep_inited= 0; + + +void item_func_sleep_init(void) +{ +#ifdef HAVE_PSI_INTERFACE + init_item_func_sleep_psi_keys(); +#endif + + mysql_mutex_init(key_LOCK_item_func_sleep, &LOCK_item_func_sleep, MY_MUTEX_INIT_SLOW); + item_func_sleep_inited= 1; +} + + +void item_func_sleep_free(void) +{ + if (item_func_sleep_inited) + { + item_func_sleep_inited= 0; + mysql_mutex_destroy(&LOCK_item_func_sleep); + } +} + + /** This function is just used to create tests with time gaps. */ longlong Item_func_sleep::val_int() @@ -4316,24 +4387,24 @@ longlong Item_func_sleep::val_int() timed_cond.set_timeout((ulonglong) (timeout * 1000000000.0)); mysql_cond_init(key_item_func_sleep_cond, &cond, NULL); - mysql_mutex_lock(&LOCK_user_locks); + mysql_mutex_lock(&LOCK_item_func_sleep); thd_proc_info(thd, "User sleep"); - thd->mysys_var->current_mutex= &LOCK_user_locks; + thd->mysys_var->current_mutex= &LOCK_item_func_sleep; thd->mysys_var->current_cond= &cond; error= 0; thd_wait_begin(thd, THD_WAIT_SLEEP); while (!thd->killed) { - error= timed_cond.wait(&cond, &LOCK_user_locks); + error= timed_cond.wait(&cond, &LOCK_item_func_sleep); if (error == ETIMEDOUT || error == ETIME) break; error= 0; } thd_wait_end(thd); thd_proc_info(thd, 0); - mysql_mutex_unlock(&LOCK_user_locks); + mysql_mutex_unlock(&LOCK_item_func_sleep); mysql_mutex_lock(&thd->mysys_var->mutex); thd->mysys_var->current_mutex= 0; thd->mysys_var->current_cond= 0; @@ -5672,6 +5743,14 @@ longlong Item_func_get_system_var::val_int() { THD *thd= current_thd; + DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", + { + if (0 == strcmp("gtid_domain_id", var->name.str)) + { + my_error(ER_VAR_CANT_BE_READ, MYF(0), var->name.str); + return 0; + } + }); if (cache_present && thd->query_id == used_query_id) { if (cache_present & GET_SYS_VAR_CACHE_LONG) @@ -5869,15 +5948,12 @@ void Item_func_match::init_search(bool no_order) { DBUG_ENTER("Item_func_match::init_search"); + if (!table->file->get_table()) // the handler isn't opened yet + DBUG_VOID_RETURN; + /* Check if init_search() has been called before */ if (ft_handler) { - /* - We should reset ft_handler as it is cleaned up - on destruction of FT_SELECT object - (necessary in case of re-execution of subquery). - TODO: FT_SELECT should not clean up ft_handler. - */ if (join_key) table->file->ft_handler= ft_handler; DBUG_VOID_RETURN; @@ -5886,10 +5962,10 @@ void Item_func_match::init_search(bool no_order) if (key == NO_SUCH_KEY) { List<Item> fields; - fields.push_back(new Item_string(" ",1, cmp_collation.collation)); - for (uint i=1; i < arg_count; i++) + fields.push_back(new Item_string(" ", 1, cmp_collation.collation)); + for (uint i= 1; i < arg_count; i++) fields.push_back(args[i]); - concat_ws=new Item_func_concat_ws(fields); + concat_ws= new Item_func_concat_ws(fields); /* Above function used only to get value and do not need fix_fields for it: Item_string - basic constant @@ -5901,10 +5977,10 @@ void Item_func_match::init_search(bool no_order) if (master) { - join_key=master->join_key=join_key|master->join_key; + join_key= master->join_key= join_key | master->join_key; master->init_search(no_order); - ft_handler=master->ft_handler; - join_key=master->join_key; + ft_handler= master->ft_handler; + join_key= master->join_key; DBUG_VOID_RETURN; } @@ -5914,7 +5990,7 @@ void Item_func_match::init_search(bool no_order) if (!(ft_tmp=key_item()->val_str(&value))) { ft_tmp= &value; - value.set("",0,cmp_collation.collation); + value.set("", 0, cmp_collation.collation); } if (ft_tmp->charset() != cmp_collation.collation) @@ -5927,7 +6003,11 @@ void Item_func_match::init_search(bool no_order) if (join_key && !no_order) flags|=FT_SORTED; - ft_handler=table->file->ft_init_ext(flags, key, ft_tmp); + + if (key != NO_SUCH_KEY) + thd_proc_info(table->in_use, "FULLTEXT initialization"); + + ft_handler= table->file->ft_init_ext(flags, key, ft_tmp); if (join_key) table->file->ft_handler=ft_handler; @@ -6208,61 +6288,6 @@ Item *get_system_var(THD *thd, enum_var_type var_type, LEX_STRING name, } -/** - Check a user level lock. - - Sets null_value=TRUE on error. - - @retval - 1 Available - @retval - 0 Already taken, or error -*/ - -longlong Item_func_is_free_lock::val_int() -{ - DBUG_ASSERT(fixed == 1); - String *res=args[0]->val_str(&value); - User_level_lock *ull; - - null_value=0; - if (!res || !res->length()) - { - null_value=1; - return 0; - } - - mysql_mutex_lock(&LOCK_user_locks); - ull= (User_level_lock *) my_hash_search(&hash_user_locks, (uchar*) res->ptr(), - (size_t) res->length()); - mysql_mutex_unlock(&LOCK_user_locks); - if (!ull || !ull->locked) - return 1; - return 0; -} - -longlong Item_func_is_used_lock::val_int() -{ - DBUG_ASSERT(fixed == 1); - String *res=args[0]->val_str(&value); - User_level_lock *ull; - - null_value=1; - if (!res || !res->length()) - return 0; - - mysql_mutex_lock(&LOCK_user_locks); - ull= (User_level_lock *) my_hash_search(&hash_user_locks, (uchar*) res->ptr(), - (size_t) res->length()); - mysql_mutex_unlock(&LOCK_user_locks); - if (!ull || !ull->locked) - return 0; - - null_value=0; - return ull->thread_id; -} - - longlong Item_func_row_count::val_int() { DBUG_ASSERT(fixed == 1); @@ -6713,7 +6738,7 @@ ulonglong uuid_value; void uuid_short_init() { - uuid_value= ((((ulonglong) server_id) << 56) + + uuid_value= ((((ulonglong) global_system_variables.server_id) << 56) + (((ulonglong) server_start_time) << 24)); } diff --git a/sql/item_func.h b/sql/item_func.h index 653641c9f72..ab6ec706248 100644 --- a/sql/item_func.h +++ b/sql/item_func.h @@ -1257,6 +1257,9 @@ public: }; +void item_func_sleep_init(void); +void item_func_sleep_free(void); + class Item_func_sleep :public Item_int_func { public: @@ -1506,14 +1509,8 @@ public: #endif /* HAVE_DLOPEN */ -/* -** User level locks -*/ - -class User_level_lock; -void item_user_lock_init(void); -void item_user_lock_release(User_level_lock *ull); -void item_user_lock_free(void); +void mysql_ull_cleanup(THD *thd); +void mysql_ull_set_explicit_lock_duration(THD *thd); class Item_func_get_lock :public Item_int_func { diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc index a4ca4dbe5a0..93569082d74 100644 --- a/sql/item_strfunc.cc +++ b/sql/item_strfunc.cc @@ -59,6 +59,7 @@ C_MODE_START #include "../mysys/my_static.h" // For soundex_map C_MODE_END #include "sql_show.h" // append_identifier +#include <sql_repl.h> /** @todo Remove this. It is not safe to use a shared String object. @@ -2668,6 +2669,46 @@ err: } +void Item_func_binlog_gtid_pos::fix_length_and_dec() +{ + collation.set(system_charset_info); + max_length= MAX_BLOB_WIDTH; + maybe_null= 1; +} + + +String *Item_func_binlog_gtid_pos::val_str(String *str) +{ + DBUG_ASSERT(fixed == 1); +#ifndef HAVE_REPLICATION + null_value= 0; + str->copy("", 0, system_charset_info); + return str; +#else + String name_str, *name; + longlong pos; + + if (args[0]->null_value || args[1]->null_value) + goto err; + + name= args[0]->val_str(&name_str); + pos= args[1]->val_int(); + + if (pos < 0 || pos > UINT_MAX32) + goto err; + + if (gtid_state_from_binlog_pos(name->c_ptr_safe(), (uint32)pos, str)) + goto err; + null_value= 0; + return str; + +err: + null_value= 1; + return NULL; +#endif /* !HAVE_REPLICATION */ +} + + void Item_func_rpad::fix_length_and_dec() { // Handle character set for args[0] and args[2]. diff --git a/sql/item_strfunc.h b/sql/item_strfunc.h index 00863e9af2b..89d7fa67f6b 100644 --- a/sql/item_strfunc.h +++ b/sql/item_strfunc.h @@ -598,6 +598,17 @@ public: }; +class Item_func_binlog_gtid_pos :public Item_str_func +{ + String tmp_value; +public: + Item_func_binlog_gtid_pos(Item *arg1,Item *arg2) :Item_str_func(arg1,arg2) {} + String *val_str(String *); + void fix_length_and_dec(); + const char *func_name() const { return "binlog_gtid_pos"; } +}; + + class Item_func_rpad :public Item_str_func { String tmp_value, rpad_str; diff --git a/sql/lex.h b/sql/lex.h index e1dc6150cc3..edf833021b0 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -77,6 +77,7 @@ static SYMBOL symbols[] = { { "AUTHORS", SYM(AUTHORS_SYM)}, { "AUTO_INCREMENT", SYM(AUTO_INC)}, { "AUTOEXTEND_SIZE", SYM(AUTOEXTEND_SIZE_SYM)}, + { "AUTO", SYM(AUTO_SYM)}, { "AVG", SYM(AVG_SYM)}, { "AVG_ROW_LENGTH", SYM(AVG_ROW_LENGTH)}, { "BACKUP", SYM(BACKUP_SYM)}, @@ -329,6 +330,7 @@ static SYMBOL symbols[] = { { "LOW_PRIORITY", SYM(LOW_PRIORITY)}, { "MASTER", SYM(MASTER_SYM)}, { "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)}, + { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)}, { "MASTER_HOST", SYM(MASTER_HOST_SYM)}, { "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)}, { "MASTER_LOG_POS", SYM(MASTER_LOG_POS_SYM)}, diff --git a/sql/log.cc b/sql/log.cc index 5ddfab6f805..a4ec5583dc9 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -120,6 +120,8 @@ static MYSQL_BIN_LOG::xid_count_per_binlog * static bool start_binlog_background_thread(); +static rpl_binlog_state rpl_global_gtid_binlog_state; + /** purge logs, master and slave sides both, related error code convertor. @@ -686,7 +688,8 @@ bool Log_to_csv_event_handler:: /* do a write */ if (table->field[1]->store(user_host, user_host_len, client_cs) || table->field[2]->store((longlong) thread_id, TRUE) || - table->field[3]->store((longlong) server_id, TRUE) || + table->field[3]->store((longlong) global_system_variables.server_id, + TRUE) || table->field[4]->store(command_type, command_type_len, client_cs)) goto err; @@ -883,7 +886,7 @@ bool Log_to_csv_event_handler:: table->field[8]->set_notnull(); } - if (table->field[9]->store((longlong) server_id, TRUE)) + if (table->field[9]->store((longlong)global_system_variables.server_id, TRUE)) goto err; table->field[9]->set_notnull(); @@ -2288,7 +2291,7 @@ static int find_uniq_filename(char *name) my_dirend(dir_info); /* check if reached the maximum possible extension number */ - if ((max_found == MAX_LOG_UNIQUE_FN_EXT)) + if (max_found == MAX_LOG_UNIQUE_FN_EXT) { sql_print_error("Log filename extension number exhausted: %06lu. \ Please fix this by archiving old logs and \ @@ -2925,7 +2928,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) bytes_written(0), file_id(1), open_count(1), group_commit_queue(0), group_commit_queue_busy(FALSE), num_commits(0), num_group_commits(0), - sync_period_ptr(sync_period), sync_counter(0), + sync_period_ptr(sync_period), sync_counter(0), state_read(false), is_relay_log(0), signal_cnt(0), checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF), relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), @@ -3119,6 +3122,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name, DBUG_ENTER("MYSQL_BIN_LOG::open"); DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg)); + if (!is_relay_log && read_state_from_file()) + DBUG_RETURN(1); + if (!is_relay_log && !binlog_background_thread_started && start_binlog_background_thread()) DBUG_RETURN(1); @@ -3232,6 +3238,47 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (!is_relay_log) { char buf[FN_REFLEN]; + + /* + Output a Gtid_list_log_event at the start of the binlog file. + + This is used to quickly determine which GTIDs are found in binlog + files earlier than this one, and which are found in this (or later) + binlogs. + + The list gives a mapping from (domain_id, server_id) -> seq_no (so + this means that there is at most one entry for every unique pair + (domain_id, server_id) in the list). It indicates that this seq_no is + the last one found in an earlier binlog file for this (domain_id, + server_id) combination - so any higher seq_no should be search for + from this binlog file, or a later one. + + This allows to locate the binlog file containing a given GTID by + scanning backwards, reading just the Gtid_list_log_event at the + start of each file, and scanning only the relevant binlog file when + found, not all binlog files. + + The existence of a given entry (domain_id, server_id, seq_no) + guarantees only that this seq_no will not be found in this or any + later binlog file. It does not guarantee that it can be found it an + earlier binlog file, for example the file may have been purged. + + If there is no entry for a given (domain_id, server_id) pair, then + it means that no such GTID exists in any earlier binlog. It is + permissible to remove such pair from future Gtid_list_log_events + if all previous binlog files containing such GTIDs have been purged + (though such optimization is not performed at the time of this + writing). So if there is no entry for given GTID it means that such + GTID should be search for in this or later binlog file, same as if + there had been an entry (domain_id, server_id, 0). + */ + + Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state); + if (gl_ev.write(&log_file)) + goto err; + + /* Output a binlog checkpoint event at the start of the binlog file. */ + /* Construct an entry in the binlog_xid_count_list for the new binlog file (we will not link it into the list until we know the new file @@ -3658,7 +3705,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log) const char* save_name; DBUG_ENTER("reset_logs"); - ha_reset_logs(thd); + if (thd) + ha_reset_logs(thd); /* We need to get both locks to be sure that no one is trying to write to the index log file. @@ -3783,6 +3831,14 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log) break; } + if (!is_relay_log) + { + rpl_global_gtid_binlog_state.reset(); + mysql_mutex_lock(&LOCK_gtid_counter); + global_gtid_counter= 0; + mysql_mutex_unlock(&LOCK_gtid_counter); + } + /* Start logging with a new file */ close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED); if ((error= my_delete_allow_opened(index_file_name, MYF(0)))) // Reset (open will update) @@ -5297,6 +5353,213 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, DBUG_RETURN(error); } + +/* Generate a new global transaction ID, and write it to the binlog */ +bool +MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, + bool is_transactional) +{ + rpl_gtid gtid; + uint64 seq_no; + + seq_no= thd->variables.gtid_seq_no; + /* + Reset the session variable gtid_seq_no, to reduce the risk of accidentally + producing a duplicate GTID. + */ + thd->variables.gtid_seq_no= 0; + if (seq_no != 0) + { + /* + If we see a higher sequence number, use that one as the basis of any + later generated sequence numbers. + */ + bump_seq_no_counter_if_needed(seq_no); + } + else + { + mysql_mutex_lock(&LOCK_gtid_counter); + seq_no= ++global_gtid_counter; + mysql_mutex_unlock(&LOCK_gtid_counter); + } + gtid.seq_no= seq_no; + gtid.domain_id= thd->variables.gtid_domain_id; + + Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone, + LOG_EVENT_SUPPRESS_USE_F, is_transactional); + gtid.server_id= gtid_event.server_id; + + /* Write the event to the binary log. */ + if (gtid_event.write(&mysql_bin_log.log_file)) + return true; + status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); + + /* Update the replication state (last GTID in each replication domain). */ + mysql_mutex_lock(&LOCK_rpl_gtid_state); + rpl_global_gtid_binlog_state.update(>id); + mysql_mutex_unlock(&LOCK_rpl_gtid_state); + return false; +} + + +int +MYSQL_BIN_LOG::write_state_to_file() +{ + File file_no; + IO_CACHE cache; + char buf[FN_REFLEN]; + int err; + bool opened= false; + bool inited= false; + + fn_format(buf, opt_bin_logname, mysql_data_home, ".state", + MY_UNPACK_FILENAME); + if ((file_no= mysql_file_open(key_file_binlog_state, buf, + O_RDWR|O_CREAT|O_TRUNC|O_BINARY, + MYF(MY_WME))) < 0) + { + err= 1; + goto err; + } + opened= true; + if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0, + MYF(MY_WME|MY_WAIT_IF_FULL)))) + goto err; + inited= true; + if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache))) + goto err; + inited= false; + if ((err= end_io_cache(&cache))) + goto err; + if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE)))) + goto err; + goto end; + +err: + sql_print_error("Error writing binlog state to file '%s'.\n", buf); + if (inited) + end_io_cache(&cache); +end: + if (opened) + mysql_file_close(file_no, MYF(0)); + + return err; +} + + +int +MYSQL_BIN_LOG::read_state_from_file() +{ + File file_no; + IO_CACHE cache; + char buf[FN_REFLEN]; + int err; + bool opened= false; + bool inited= false; + + if (state_read) + return 0; + state_read= true; + + fn_format(buf, opt_bin_logname, mysql_data_home, ".state", + MY_UNPACK_FILENAME); + if ((file_no= mysql_file_open(key_file_binlog_state, buf, + O_RDONLY|O_BINARY, MYF(0))) < 0) + { + if (my_errno != ENOENT) + { + err= 1; + goto err; + } + else + { + /* + If the state file does not exist, this is the first server startup + with GTID enabled. So initialize to empty state. + */ + rpl_global_gtid_binlog_state.reset(); + err= 0; + goto end; + } + } + opened= true; + if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0, + MYF(MY_WME|MY_WAIT_IF_FULL)))) + goto err; + inited= true; + if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache))) + goto err; + goto end; + +err: + sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf); +end: + if (inited) + end_io_cache(&cache); + if (opened) + mysql_file_close(file_no, MYF(0)); + /* Pick the next unused seq_no from the loaded binlog state. */ + bump_seq_no_counter_if_needed( + rpl_global_gtid_binlog_state.seq_no_from_state()); + + return err; +} + + +int +MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) +{ + return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size); +} + + +bool +MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id, + rpl_gtid *out_gtid) +{ + rpl_gtid *gtid; + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id))) + *out_gtid= *gtid; + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + return gtid != NULL; +} + + +bool +MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id, + rpl_gtid *out_gtid) +{ + rpl_binlog_state::element *elem; + bool res; + + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + elem= (rpl_binlog_state::element *) + my_hash_search(&rpl_global_gtid_binlog_state.hash, + (const uchar *)&domain_id, 0); + if (elem) + { + res= true; + *out_gtid= *elem->last_gtid; + } + else + res= false; + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + + return res; +} + + +void +MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no) +{ + mysql_mutex_lock(&LOCK_gtid_counter); + if (global_gtid_counter < seq_no) + global_gtid_counter= seq_no; + mysql_mutex_unlock(&LOCK_gtid_counter); +} + + /** Write an event to the binary log. If with_annotate != NULL and *with_annotate = TRUE write also Annotate_rows before the event @@ -5366,6 +5629,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); prev_binlog_id= current_binlog_id; + write_gtid_event(thd, true, using_trans); } else { @@ -6238,19 +6502,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, break; } - /* - Log "BEGIN" at the beginning of every transaction. Here, a transaction is - either a BEGIN..COMMIT block or a single statement in autocommit mode. - - Create the necessary events here, where we have the correct THD (and - thread context). - - Due to group commit the actual writing to binlog may happen in a different - thread. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE, - TRUE, 0); - entry.begin_event= &qinfo; entry.end_event= end_ev; if (cache_mngr->stmt_cache.has_incident() || cache_mngr->trx_cache.has_incident()) @@ -6626,10 +6877,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) { binlog_cache_mngr *mngr= entry->cache_mngr; - if (entry->begin_event->write(&log_file)) + if (write_gtid_event(entry->thd, false, entry->using_trx_cache)) return ER_ERROR_ON_WRITE; - status_var_add(entry->thd->status_var.binlog_bytes_written, - entry->begin_event->data_written); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) @@ -6770,6 +7019,8 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, void MYSQL_BIN_LOG::close(uint exiting) { // One can't set log_type here! + bool failed_to_save_state= false; + DBUG_ENTER("MYSQL_BIN_LOG::close"); DBUG_PRINT("enter",("exiting: %d", (int) exiting)); if (log_state == LOG_OPENED) @@ -6787,6 +7038,27 @@ void MYSQL_BIN_LOG::close(uint exiting) s.write(&log_file); bytes_written+= s.data_written; signal_update(); + + /* + When we shut down server, write out the binlog state to a separate + file so we do not have to scan an entire binlog file to recover it + at next server start. + + Note that this must be written and synced to disk before marking the + last binlog file as "not crashed". + */ + if (!is_relay_log && write_state_to_file()) + { + sql_print_error("Failed to save binlog GTID state during shutdown. " + "Binlog will be marked as crashed, so that crash " + "recovery can recover the state at next server " + "startup."); + /* + Leave binlog file marked as crashed, so we can recover state by + scanning it now that we failed to write out the state properly. + */ + failed_to_save_state= true; + } } #endif /* HAVE_REPLICATION */ @@ -6795,7 +7067,8 @@ void MYSQL_BIN_LOG::close(uint exiting) && !(exiting & LOG_CLOSE_DELAYED_CLOSE)) { my_off_t org_position= mysql_file_tell(log_file.file, MYF(0)); - clear_inuse_flag_when_closing(log_file.file); + if (!failed_to_save_state) + clear_inuse_flag_when_closing(log_file.file); /* Restore position so that anything we have in the IO_cache is written to the correct position. @@ -7971,9 +8244,13 @@ int TC_LOG_BINLOG::open(const char *opt_name) sql_print_information("Recovering after a crash using %s", opt_name); error= recover(&log_info, log_name, &log, (Format_description_log_event *)ev); + state_read= true; + /* Pick the next unused seq_no from the recovered binlog state. */ + bump_seq_no_counter_if_needed( + rpl_global_gtid_binlog_state.seq_no_from_state()); } else - error=0; + error= read_state_from_file(); delete ev; end_io_cache(&log); @@ -8223,6 +8500,28 @@ binlog_background_thread(void *arg __attribute__((unused))) mysql_mutex_unlock(&LOCK_thread_count); thd->store_globals(); + /* + Load the slave replication GTID state from the mysql.rpl_slave_state + table. + + This is mostly so that we can start our seq_no counter from the highest + seq_no seen by a slave. This way, we have a way to tell if a transaction + logged by ourselves as master is newer or older than a replicated + transaction. + */ +#ifdef HAVE_REPLICATION + if (rpl_load_gtid_slave_state(thd)) + sql_print_warning("Failed to load slave replication state from table " + "%s.%s: %u: %s", "mysql", + rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); +#endif + + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + binlog_background_thread_started= true; + mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + for (;;) { /* @@ -8321,7 +8620,16 @@ start_binlog_background_thread() binlog_background_thread, NULL)) return 1; - binlog_background_thread_started= true; + /* + Wait for the thread to have started (so we know that the slave replication + state is loaded and we have correct global_gtid_counter). + */ + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + while (!binlog_background_thread_started) + mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread_end, + &mysql_bin_log.LOCK_binlog_background_thread); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + return 0; } @@ -8400,6 +8708,37 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, } break; } + case GTID_LIST_EVENT: + if (first_round) + { + uint32 i; + Gtid_list_log_event *glev= (Gtid_list_log_event *)ev; + + /* Initialise the binlog state from the Gtid_list event. */ + rpl_global_gtid_binlog_state.reset(); + for (i= 0; i < glev->count; ++i) + { + if (rpl_global_gtid_binlog_state.update(&(glev->list[i]))) + goto err2; + } + } + break; + + case GTID_EVENT: + if (first_round) + { + Gtid_log_event *gev= (Gtid_log_event *)ev; + rpl_gtid gtid; + + /* Update the binlog state with any GTID logged after Gtid_list. */ + gtid.domain_id= gev->domain_id; + gtid.server_id= gev->server_id; + gtid.seq_no= gev->seq_no; + if (rpl_global_gtid_binlog_state.update(>id)) + goto err2; + } + break; + default: /* Nothing. */ break; diff --git a/sql/log.h b/sql/log.h index da8faa36a00..bd20c8aee09 100644 --- a/sql/log.h +++ b/sql/log.h @@ -396,6 +396,7 @@ private: ( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID ) class binlog_cache_mngr; +struct rpl_gtid; class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG { private: @@ -420,11 +421,10 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG bool using_stmt_cache; bool using_trx_cache; /* - Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be + Extra events (COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be written during group commit. The incident_event is only valid if trx_data->has_incident() is true. */ - Log_event *begin_event; Log_event *end_event; Log_event *incident_event; /* Set during group commit to record any per-thread error. */ @@ -507,6 +507,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG */ uint *sync_period_ptr; uint sync_counter; + /* Protect against reading the binlog state file twice. */ + bool state_read; inline uint get_sync_period() { @@ -773,6 +775,14 @@ public: inline uint32 get_open_count() { return open_count; } void set_status_variables(THD *thd); bool is_xidlist_idle(); + bool write_gtid_event(THD *thd, bool standalone, bool is_transactional); + int read_state_from_file(); + int write_state_to_file(); + int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); + bool find_in_binlog_state(uint32 domain_id, uint32 server_id, + rpl_gtid *out_gtid); + bool lookup_domain_in_binlog_state(uint32 domain_id, rpl_gtid *out_gtid); + void bump_seq_no_counter_if_needed(uint64 seq_no); }; class Log_event_handler diff --git a/sql/log_event.cc b/sql/log_event.cc index 6c637be19fa..104ea948cfc 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -537,7 +537,7 @@ static char *slave_load_file_stem(char *buf, uint file_id, to_unix_path(buf); buf = strend(buf); - buf = int10_to_str(::server_id, buf, 10); + buf = int10_to_str(global_system_variables.server_id, buf, 10); *buf++ = '-'; buf = int10_to_str(event_server_id, buf, 10); *buf++ = '-'; @@ -573,7 +573,7 @@ static void cleanup_load_tmpdir() LOAD DATA. */ p= strmake(prefbuf, STRING_WITH_LEN(PREFIX_SQL_LOAD)); - p= int10_to_str(::server_id, p, 10); + p= int10_to_str(global_system_variables.server_id, p, 10); *(p++)= '-'; *p= 0; @@ -749,6 +749,8 @@ const char* Log_event::get_type_str(Log_event_type type) case INCIDENT_EVENT: return "Incident"; case ANNOTATE_ROWS_EVENT: return "Annotate_rows"; case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint"; + case GTID_EVENT: return "Gtid"; + case GTID_LIST_EVENT: return "Gtid_list"; default: return "Unknown"; /* impossible */ } } @@ -769,7 +771,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) crc(0), thd(thd_arg), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { - server_id= thd->server_id; + server_id= thd->variables.server_id; when= thd->start_time; when_sec_part=thd->start_time_sec_part; @@ -794,7 +796,7 @@ Log_event::Log_event() cache_type(Log_event::EVENT_INVALID_CACHE), crc(0), thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { - server_id= ::server_id; + server_id= global_system_variables.server_id; /* We can't call my_time() here as this would cause a call before my_init() is called @@ -909,9 +911,11 @@ int Log_event::do_update_pos(Relay_log_info *rli) if (debug_not_change_ts_if_art_event == 1 && is_artificial_event()) debug_not_change_ts_if_art_event= 0; ); - rli->stmt_done(log_pos, is_artificial_event() && - IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? - 0 : when); + rli->stmt_done(log_pos, + (is_artificial_event() && + IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ? + 0 : when), + thd); DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp", if (debug_not_change_ts_if_art_event == 0) debug_not_change_ts_if_art_event= 2; ); @@ -926,10 +930,11 @@ Log_event::do_shall_skip(Relay_log_info *rli) DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu," " rli->replicate_same_server_id: %d," " rli->slave_skip_counter: %lu", - (ulong) server_id, (ulong) ::server_id, + (ulong) server_id, (ulong) global_system_variables.server_id, rli->replicate_same_server_id, rli->slave_skip_counter)); - if ((server_id == ::server_id && !rli->replicate_same_server_id) || + if ((server_id == global_system_variables.server_id && + !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()) || (flags & LOG_EVENT_SKIP_REPLICATION_F && opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE)) @@ -1370,7 +1375,7 @@ failed my_b_read")); Log_event *res= 0; #ifndef max_allowed_packet THD *thd=current_thd; - uint max_allowed_packet= thd ? slave_max_allowed_packet:~(ulong)0; + uint max_allowed_packet= thd ? slave_max_allowed_packet:~(uint)0; #endif if (data_len > max_allowed_packet) @@ -1560,6 +1565,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case BINLOG_CHECKPOINT_EVENT: ev = new Binlog_checkpoint_log_event(buf, event_len, description_event); break; + case GTID_EVENT: + ev = new Gtid_log_event(buf, event_len, description_event); + break; + case GTID_LIST_EVENT: + ev = new Gtid_list_log_event(buf, event_len, description_event); + break; #ifdef HAVE_REPLICATION case SLAVE_EVENT: /* can never happen (unused event) */ ev = new Slave_log_event(buf, event_len, description_event); @@ -3437,6 +3448,53 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset, return 0; } +/* + Replace an event (GTID event) with a BEGIN query event, to be compatible + with an old slave. +*/ +int +Query_log_event::begin_event(String *packet, ulong ev_offset, + uint8 checksum_alg) +{ + uchar *p= (uchar *)packet->ptr() + ev_offset; + uchar *q= p + LOG_EVENT_HEADER_LEN; + size_t data_len= packet->length() - ev_offset; + uint16 flags; + + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + data_len-= BINLOG_CHECKSUM_LEN; + else + DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + checksum_alg == BINLOG_CHECKSUM_ALG_OFF); + + /* Currently we only need to replace GTID event. */ + DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN); + if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return 1; + + flags= uint2korr(p + FLAGS_OFFSET); + flags&= ~LOG_EVENT_THREAD_SPECIFIC_F; + flags|= LOG_EVENT_SUPPRESS_USE_F; + int2store(p + FLAGS_OFFSET, flags); + + p[EVENT_TYPE_OFFSET]= QUERY_EVENT; + int4store(q + Q_THREAD_ID_OFFSET, 0); + int4store(q + Q_EXEC_TIME_OFFSET, 0); + q[Q_DB_LEN_OFFSET]= 0; + int2store(q + Q_ERR_CODE_OFFSET, 0); + int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0); + q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */ + q+= Q_DATA_OFFSET + 1; + memcpy(q, "BEGIN", 5); + + if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) + { + ha_checksum crc= my_checksum(0L, p, data_len); + int4store(p + data_len, crc); + } + return 0; +} + #ifdef MYSQL_CLIENT /** @@ -3696,6 +3754,8 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, LEX_STRING new_db; int expected_error,actual_error= 0; HA_CREATE_INFO db_options; + uint64 sub_id= 0; + rpl_gtid gtid; DBUG_ENTER("Query_log_event::do_apply_event"); /* @@ -3883,6 +3943,30 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, else thd->variables.collation_database= thd->db_charset; + /* + Record any GTID in the same transaction, so slave state is + transactionally consistent. + */ + if (strcmp("COMMIT", query) == 0 && (sub_id= rli->gtid_sub_id)) + { + /* Clear the GTID from the RLI so we don't accidentally reuse it. */ + const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; + + gtid= rli->current_gtid; + if (rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true)) + { + rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, + "Error during COMMIT: failed to update GTID state in " + "%s.%s: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + trans_rollback(thd); + sub_id= 0; + thd->is_slave_error= 1; + goto end; + } + } + thd->table_map_for_update= (table_map)table_map_for_update; thd->set_invoker(&user, &host); /* @@ -4068,6 +4152,9 @@ Default database: '%s'. Query: '%s'", } end: + if (sub_id && !thd->is_slave_error) + rpl_global_gtid_slave_state.update_state_hash(sub_id, >id); + /* Probably we have set thd->query, thd->db, thd->catalog to point to places in the data_buf of this event. Now the event is going to be deleted @@ -4145,6 +4232,17 @@ Query_log_event::do_shall_skip(Relay_log_info *rli) DBUG_RETURN(Log_event::do_shall_skip(rli)); } + +bool +Query_log_event::peek_is_commit_rollback(const char *event_start, + size_t event_len) +{ + if (event_len < LOG_EVENT_HEADER_LEN + QUERY_HEADER_LEN || event_len < 9) + return false; + return !memcmp(event_start + (event_len-7), "\0COMMIT", 7) || + !memcmp(event_start + (event_len-9), "\0ROLLBACK", 9); +} + #endif @@ -4459,6 +4557,8 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN; post_header_len[BINLOG_CHECKPOINT_EVENT-1]= BINLOG_CHECKPOINT_HEADER_LEN; + post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN; + post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN; // Sanity-check that all post header lengths are initialized. int i; @@ -4663,7 +4763,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli) perform, we don't call Start_log_event_v3::do_apply_event() (this was just to update the log's description event). */ - if (server_id != (uint32) ::server_id) + if (server_id != (uint32) global_system_variables.server_id) { /* If the event was not requested by the slave i.e. the master sent @@ -4689,7 +4789,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli) int Format_description_log_event::do_update_pos(Relay_log_info *rli) { - if (server_id == (uint32) ::server_id) + if (server_id == (uint32) global_system_variables.server_id) { /* We only increase the relay log position if we are skipping @@ -5744,7 +5844,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) #endif DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu", - (ulong) this->server_id, (ulong) ::server_id)); + (ulong) this->server_id, (ulong) global_system_variables.server_id)); DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident)); DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf))); @@ -5764,7 +5864,8 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) 5.0.0, there also are some rotates from the slave itself, in the relay log, which shall not change the group positions. */ - if ((server_id != ::server_id || rli->replicate_same_server_id) && + if ((server_id != global_system_variables.server_id || + rli->replicate_same_server_id) && !is_relay_log_event() && !rli->is_in_group()) { @@ -5781,6 +5882,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) rli->group_master_log_name, (ulong) rli->group_master_log_pos)); mysql_mutex_unlock(&rli->data_lock); + rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli); flush_relay_log_info(rli); /* @@ -5905,6 +6007,394 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file) /************************************************************************** + Global transaction ID stuff +**************************************************************************/ + +Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event) + : Log_event(buf, description_event), seq_no(0) +{ + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1]; + if (event_len < header_size + post_header_len || + post_header_len < GTID_HEADER_LEN) + return; + + buf+= header_size; + seq_no= uint8korr(buf); + buf+= 8; + domain_id= uint4korr(buf); + buf+= 4; + flags2= *buf; +} + + +#ifdef MYSQL_SERVER + +Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, + uint32 domain_id_arg, bool standalone, + uint16 flags_arg, bool is_transactional) + : Log_event(thd_arg, flags_arg, is_transactional), + seq_no(seq_no_arg), domain_id(domain_id_arg), + flags2(standalone ? FL_STANDALONE : 0) +{ + cache_type= Log_event::EVENT_NO_CACHE; +} + + +/* + Used to record GTID while sending binlog to slave, without having to + fully contruct every Gtid_log_event() needlessly. +*/ +bool +Gtid_log_event::peek(const char *event_start, size_t event_len, + uint32 *domain_id, uint32 *server_id, uint64 *seq_no, + uchar *flags2) +{ + const char *p; + if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return true; + *server_id= uint4korr(event_start + SERVER_ID_OFFSET); + p= event_start + LOG_EVENT_HEADER_LEN; + *seq_no= uint8korr(p); + p+= 8; + *domain_id= uint4korr(p); + p+= 4; + *flags2= (uchar)*p; + return false; +} + + +bool +Gtid_log_event::write(IO_CACHE *file) +{ + uchar buf[GTID_HEADER_LEN]; + int8store(buf, seq_no); + int4store(buf+8, domain_id); + buf[12]= flags2; + bzero(buf+13, GTID_HEADER_LEN-13); + return write_header(file, GTID_HEADER_LEN) || + wrapper_my_b_safe_write(file, buf, GTID_HEADER_LEN) || + write_footer(file); +} + + +/* + Replace a GTID event with either a BEGIN event, dummy event, or nothing, as + appropriate to work with old slave that does not know global transaction id. + + The need_dummy_event argument is an IN/OUT argument. It is passed as TRUE + if slave has capability lower than MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES. + It is returned TRUE if we return a BEGIN (or dummy) event to be sent to the + slave, FALSE if event should be skipped completely. +*/ +int +Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, + ulong ev_offset, uint8 checksum_alg) +{ + uchar flags2; + if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) + return 1; + flags2= (*packet)[ev_offset + LOG_EVENT_HEADER_LEN + 12]; + if (flags2 & FL_STANDALONE) + { + if (need_dummy_event) + return Query_log_event::dummy_event(packet, ev_offset, checksum_alg); + else + return 0; + } + + *need_dummy_event= true; + return Query_log_event::begin_event(packet, ev_offset, checksum_alg); +} + + +#ifdef HAVE_REPLICATION +void +Gtid_log_event::pack_info(THD *thd, Protocol *protocol) +{ + char buf[6+5+10+1+10+1+20+1]; + char *p; + p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID ")); + p= longlong10_to_str(domain_id, p, 10); + *p++= '-'; + p= longlong10_to_str(server_id, p, 10); + *p++= '-'; + p= longlong10_to_str(seq_no, p, 10); + + protocol->store(buf, p-buf, &my_charset_bin); +} + +static char gtid_begin_string[] = "BEGIN"; + +int +Gtid_log_event::do_apply_event(Relay_log_info const *rli) +{ + thd->variables.server_id= this->server_id; + thd->variables.gtid_domain_id= this->domain_id; + thd->variables.gtid_seq_no= this->seq_no; + + if (flags2 & FL_STANDALONE) + return 0; + + /* Execute this like a BEGIN query event. */ + thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1, + &my_charset_bin, next_query_id()); + Parser_state parser_state; + if (!parser_state.init(thd, thd->query(), thd->query_length())) + { + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); + /* Finalize server status flags after executing a statement. */ + thd->update_server_status(); + log_slow_statement(thd); + if (unlikely(thd->is_fatal_error)) + thd->is_slave_error= 1; + else if (likely(!thd->is_slave_error)) + general_log_write(thd, COM_QUERY, thd->query(), thd->query_length()); + } + + thd->reset_query(); + free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); + return thd->is_slave_error; +} + + +int +Gtid_log_event::do_update_pos(Relay_log_info *rli) +{ + rli->inc_event_relay_log_pos(); + return 0; +} + + +Log_event::enum_skip_reason +Gtid_log_event::do_shall_skip(Relay_log_info *rli) +{ + /* + An event skipped due to @@skip_replication must not be counted towards the + number of events to be skipped due to @@sql_slave_skip_counter. + */ + if (flags & LOG_EVENT_SKIP_REPLICATION_F && + opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE) + return Log_event::EVENT_SKIP_IGNORE; + + if (rli->slave_skip_counter > 0) + { + if (!(flags2 & FL_STANDALONE)) + thd->variables.option_bits|= OPTION_BEGIN; + return Log_event::continue_group(rli); + } + return Log_event::do_shall_skip(rli); +} + + +#endif /* HAVE_REPLICATION */ + +#else /* !MYSQL_SERVER */ + +void +Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char buf[21]; + + if (!print_event_info->short_form) + { + print_header(&cache, print_event_info, FALSE); + longlong10_to_str(seq_no, buf, 10); + my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf); + + if (!print_event_info->domain_id_printed || + print_event_info->domain_id != domain_id) + { + my_b_printf(&cache, "/*!100001 SET @@session.gtid_domain_id=%u*/%s\n", + domain_id, print_event_info->delimiter); + print_event_info->domain_id= domain_id; + print_event_info->domain_id_printed= true; + } + + if (!print_event_info->server_id_printed || + print_event_info->server_id != server_id) + { + my_b_printf(&cache, "/*!100001 SET @@session.server_id=%u*/%s\n", + server_id, print_event_info->delimiter); + print_event_info->server_id= server_id; + print_event_info->server_id_printed= true; + } + + my_b_printf(&cache, "/*!100001 SET @@session.gtid_seq_no=%s*/%s\n", + buf, print_event_info->delimiter); + } + if (!(flags2 & FL_STANDALONE)) + my_b_printf(&cache, "BEGIN\n%s\n", print_event_info->delimiter); +} + +#endif /* MYSQL_SERVER */ + + +/* GTID list. */ + +Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event) + : Log_event(buf, description_event), count(0), list(0) +{ + uint32 i; + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1]; + if (event_len < header_size + post_header_len || + post_header_len < GTID_LIST_HEADER_LEN) + return; + + buf+= header_size; + count= uint4korr(buf) & ((1<<28)-1); + buf+= 4; + if (event_len - (header_size + post_header_len) < count*element_size || + (!(list= (rpl_gtid *)my_malloc(count*sizeof(*list) + (count == 0), + MYF(MY_WME))))) + return; + + for (i= 0; i < count; ++i) + { + list[i].domain_id= uint4korr(buf); + buf+= 4; + list[i].server_id= uint4korr(buf); + buf+= 4; + list[i].seq_no= uint8korr(buf); + buf+= 8; + } +} + + +#ifdef MYSQL_SERVER + +Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) + : count(gtid_set->count()), list(0) +{ + cache_type= EVENT_NO_CACHE; + /* Failure to allocate memory will be caught by is_valid() returning false. */ + if (count < (1<<28) && + (list = (rpl_gtid *)my_malloc(count * sizeof(*list) + (count == 0), + MYF(MY_WME)))) + gtid_set->get_gtid_list(list, count); +} + +bool +Gtid_list_log_event::write(IO_CACHE *file) +{ + uint32 i; + uchar buf[element_size]; + + DBUG_ASSERT(count < 1<<28); + + if (write_header(file, get_data_size())) + return 1; + int4store(buf, count & ((1<<28)-1)); + if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN)) + return 1; + for (i= 0; i < count; ++i) + { + int4store(buf, list[i].domain_id); + int4store(buf+4, list[i].server_id); + int8store(buf+8, list[i].seq_no); + if (wrapper_my_b_safe_write(file, buf, element_size)) + return 1; + } + return write_footer(file); +} + + +#ifdef HAVE_REPLICATION +void +Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol) +{ + char buf_mem[1024]; + String buf(buf_mem, sizeof(buf_mem), system_charset_info); + uint32 i; + bool first; + + buf.length(0); + buf.append(STRING_WITH_LEN("[")); + first= true; + for (i= 0; i < count; ++i) + rpl_slave_state_tostring_helper(&buf, &list[i], &first); + buf.append(STRING_WITH_LEN("]")); + + protocol->store(&buf); +} +#endif /* HAVE_REPLICATION */ + +#else /* !MYSQL_SERVER */ + +void +Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + if (!print_event_info->short_form) + { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char buf[21]; + uint32 i; + + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tGtid list ["); + for (i= 0; i < count; ++i) + { + longlong10_to_str(list[i].seq_no, buf, 10); + my_b_printf(&cache, "%u-%u-%s", list[i].domain_id, + list[i].server_id, buf); + if (i < count-1) + my_b_printf(&cache, ",\n# "); + } + my_b_printf(&cache, "]\n"); + } +} + +#endif /* MYSQL_SERVER */ + + +/* + Used to record gtid_list event while sending binlog to slave, without having to + fully contruct the event object. +*/ +bool +Gtid_list_log_event::peek(const char *event_start, uint32 event_len, + rpl_gtid **out_gtid_list, uint32 *out_list_len) +{ + const char *p; + uint32 count_field, count; + rpl_gtid *gtid_list; + + if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN) + return true; + p= event_start + LOG_EVENT_HEADER_LEN; + count_field= uint4korr(p); + p+= 4; + count= count_field & ((1<<28)-1); + if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN + + 16 * count) + return true; + if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count + (count == 0), + MYF(MY_WME)))) + return true; + *out_gtid_list= gtid_list; + *out_list_len= count; + while (count--) + { + gtid_list->domain_id= uint4korr(p); + p+= 4; + gtid_list->server_id= uint4korr(p); + p+= 4; + gtid_list->seq_no= uint8korr(p); + p+= 8; + ++gtid_list; + } + + return false; +} + + +/************************************************************************** Intvar_log_event methods **************************************************************************/ @@ -6257,12 +6747,43 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) int Xid_log_event::do_apply_event(Relay_log_info const *rli) { bool res; + int err; + rpl_gtid gtid; + uint64 sub_id; + + /* + Record any GTID in the same transaction, so slave state is transactionally + consistent. + */ + if ((sub_id= rli->gtid_sub_id)) + { + /* Clear the GTID from the RLI so we don't accidentally reuse it. */ + const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; + + gtid= rli->current_gtid; + err= rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true); + if (err) + { + rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, + "Error during XID COMMIT: failed to update GTID state in " + "%s.%s: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + trans_rollback(thd); + thd->is_slave_error= 1; + return err; + } + } + /* For a slave Xid_log_event is COMMIT */ general_log_print(thd, COM_QUERY, "COMMIT /* implicit, from Xid_log_event */"); res= trans_commit(thd); /* Automatically rolls back on error. */ thd->mdl_context.release_transactional_locks(); + if (sub_id) + rpl_global_gtid_slave_state.update_state_hash(sub_id, >id); + /* Increment the global status commit count variable */ @@ -7009,6 +7530,7 @@ int Stop_log_event::do_update_pos(Relay_log_info *rli) rli->inc_event_relay_log_pos(); else { + rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli); rli->inc_group_relay_log_pos(0); flush_relay_log_info(rli); } @@ -8810,7 +9332,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli) Step the group log position if we are not in a transaction, otherwise increase the event log position. */ - rli->stmt_done(log_pos, when); + rli->stmt_done(log_pos, when, thd); /* Clear any errors in thd->net.last_err*. It is not known if this is needed or not. It is believed that any errors that may exist in @@ -11148,7 +11670,9 @@ st_print_event_info::st_print_event_info() auto_increment_increment(0),auto_increment_offset(0), charset_inited(0), lc_time_names_number(~0), charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER), - thread_id(0), thread_id_printed(false), skip_replication(0), + thread_id(0), thread_id_printed(false), server_id(0), + server_id_printed(false), domain_id(0), domain_id_printed(false), + skip_replication(0), base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE) { /* diff --git a/sql/log_event.h b/sql/log_event.h index be63304b529..1c2b2769915 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -50,6 +50,8 @@ #include "sql_class.h" /* THD */ #endif +#include "rpl_gtid.h" + /* Forward declarations */ class String; @@ -261,6 +263,8 @@ struct sql_ex_info #define HEARTBEAT_HEADER_LEN 0 #define ANNOTATE_ROWS_HEADER_LEN 0 #define BINLOG_CHECKPOINT_HEADER_LEN 4 +#define GTID_HEADER_LEN 19 +#define GTID_LIST_HEADER_LEN 4 /* Max number of possible extra bytes in a replication event compared to a @@ -600,16 +604,13 @@ enum enum_binlog_checksum_alg { because they mis-compute the offsets into the master's binlog). */ #define MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES 2 -/* MariaDB > 5.5, which knows about binlog_checkpoint_log_event. */ +/* MariaDB >= 10.0, which knows about binlog_checkpoint_log_event. */ #define MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT 3 -/* - MariaDB server which understands MySQL 5.6 ignorable events. This server - can tolerate receiving any event with the LOG_EVENT_IGNORABLE_F flag set. -*/ -#define MARIA_SLAVE_CAPABILITY_IGNORABLE 4 +/* MariaDB >= 10.0.1, which knows about global transaction id events. */ +#define MARIA_SLAVE_CAPABILITY_GTID 4 /* Our capability. */ -#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT +#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_GTID /** @@ -695,6 +696,18 @@ enum Log_event_type that are prepared in storage engines but not yet committed. */ BINLOG_CHECKPOINT_EVENT= 161, + /* + Gtid event. For global transaction ID, used to start a new event group, + instead of the old BEGIN query event, and also to mark stand-alone + events. + */ + GTID_EVENT= 162, + /* + Gtid list event. Logged at the start of every binlog, to record the + current replication state. This consists of the last GTID seen for + each replication domain. + */ + GTID_LIST_EVENT= 163, /* Add new MariaDB events here - right above this comment! */ @@ -767,6 +780,11 @@ typedef struct st_print_event_info uint charset_database_number; uint thread_id; bool thread_id_printed; + uint32 server_id; + bool server_id_printed; + uint32 domain_id; + bool domain_id_printed; + /* Track when @@skip_replication changes so we need to output a SET statement for it. @@ -1302,6 +1320,35 @@ public: return do_shall_skip(rli); } + + /* + Check if an event is non-final part of a stand-alone event group, + such as Intvar_log_event (such events should be processed as part + of the following event group, not individually). + */ + static bool is_part_of_group(enum Log_event_type ev_type) + { + switch (ev_type) + { + case GTID_EVENT: + case INTVAR_EVENT: + case RAND_EVENT: + case USER_VAR_EVENT: + case TABLE_MAP_EVENT: + case ANNOTATE_ROWS_EVENT: + return true; + case DELETE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case WRITE_ROWS_EVENT: + /* + ToDo: also check for non-final Rows_log_event (though such events + are usually in a BEGIN-COMMIT group). + */ + default: + return false; + } + } + protected: /** @@ -1875,6 +1922,7 @@ public: } Log_event_type get_type_code() { return QUERY_EVENT; } static int dummy_event(String *packet, ulong ev_offset, uint8 checksum_alg); + static int begin_event(String *packet, ulong ev_offset, uint8 checksum_alg); #ifdef MYSQL_SERVER bool write(IO_CACHE* file); virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; } @@ -1897,6 +1945,8 @@ public: /* !!! Public in this patch to allow old usage */ int do_apply_event(Relay_log_info const *rli, const char *query_arg, uint32 q_len_arg); + static bool peek_is_commit_rollback(const char *event_start, + size_t event_len); #endif /* HAVE_REPLICATION */ /* If true, the event always be applied by slave SQL thread or be printed by @@ -2410,7 +2460,7 @@ protected: Events from ourself should be skipped, but they should not decrease the slave skip counter. */ - if (this->server_id == ::server_id) + if (this->server_id == global_system_variables.server_id) return Log_event::EVENT_SKIP_IGNORE; else return Log_event::EVENT_SKIP_NOT; @@ -2815,7 +2865,7 @@ private: Events from ourself should be skipped, but they should not decrease the slave skip counter. */ - if (this->server_id == ::server_id) + if (this->server_id == global_system_variables.server_id) return Log_event::EVENT_SKIP_IGNORE; else return Log_event::EVENT_SKIP_NOT; @@ -2942,6 +2992,194 @@ public: #endif }; + +/** + @class Gtid_log_event + + This event is logged as part of every event group to give the global + transaction id (GTID) of that group. + + It replaces the BEGIN query event used in earlier versions to begin most + event groups, but is also used for events that used to be stand-alone. + + @section Gtid_log_event_binary_format Binary Format + + The binary format for Gtid_log_event has 6 extra reserved bytes to make the + length a total of 19 byte (+ 19 bytes of header in common with all events). + This is just the minimal size for a BEGIN query event, which makes it easy + to replace this event with such BEGIN event to remain compatible with old + slave servers. + + <table> + <caption>Post-Header</caption> + + <tr> + <th>Name</th> + <th>Format</th> + <th>Description</th> + </tr> + + <tr> + <td>seq_no</td> + <td>8 byte unsigned integer</td> + <td>increasing id within one server_id. Starts at 1, holes in the sequence + may occur</td> + </tr> + + <tr> + <td>domain_id</td> + <td>4 byte unsigned integer</td> + <td>Replication domain id, identifying independent replication streams></td> + </tr> + + <tr> + <td>flags</td> + <td>1 byte bitfield</td> + <td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td> + </tr> + + <tr> + <td>Reserved</td> + <td>6 bytes</td> + <td>Reserved bytes, set to 0. Maybe be used for future expansion.</td> + </tr> + </table> + + The Body of Gtid_log_event is empty. The total event size is 19 bytes + + the normal 19 bytes common-header. +*/ + +class Gtid_log_event: public Log_event +{ +public: + uint64 seq_no; + uint32 domain_id; + uchar flags2; + + /* Flags2. */ + + /* FL_STANDALONE is set when there is no terminating COMMIT event. */ + static const uchar FL_STANDALONE= 1; + +#ifdef MYSQL_SERVER + Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone, + uint16 flags, bool is_transactional); +#ifdef HAVE_REPLICATION + void pack_info(THD *thd, Protocol *protocol); + virtual int do_apply_event(Relay_log_info const *rli); + virtual int do_update_pos(Relay_log_info *rli); + virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); +#endif +#else + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif + Gtid_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); + ~Gtid_log_event() { } + Log_event_type get_type_code() { return GTID_EVENT; } + int get_data_size() { return GTID_HEADER_LEN; } + bool is_valid() const { return seq_no != 0; } +#ifdef MYSQL_SERVER + bool write(IO_CACHE *file); + static int make_compatible_event(String *packet, bool *need_dummy_event, + ulong ev_offset, uint8 checksum_alg); + static bool peek(const char *event_start, size_t event_len, + uint32 *domain_id, uint32 *server_id, uint64 *seq_no, + uchar *flags2); +#endif +}; + + +/** + @class Gtid_list_log_event + + This event is logged at the start of every binlog file to record the + current replication state: the last global transaction id (GTID) applied + on the server within each replication domain. + + It consists of a list of GTIDs, one for each replication domain ever seen + on the server. + + @section Gtid_list_log_event_binary_format Binary Format + + <table> + <caption>Post-Header</caption> + + <tr> + <th>Name</th> + <th>Format</th> + <th>Description</th> + </tr> + + <tr> + <td>count</td> + <td>4 byte unsigned integer</td> + <td>The lower 28 bits are the number of GTIDs. The upper 4 bits are + reserved for flags bits for future expansion</td> + </tr> + </table> + + <table> + <caption>Body</caption> + + <tr> + <th>Name</th> + <th>Format</th> + <th>Description</th> + </tr> + + <tr> + <td>domain_id</td> + <td>4 byte unsigned integer</td> + <td>Replication domain id of one GTID</td> + </tr> + + <tr> + <td>server_id</td> + <td>4 byte unsigned integer</td> + <td>Server id of one GTID</td> + </tr> + + <tr> + <td>seq_no</td> + <td>8 byte unsigned integer</td> + <td>sequence number of one GTID</td> + </tr> + </table> + + The three elements in the body repeat COUNT times to form the GTID list. +*/ + +class Gtid_list_log_event: public Log_event +{ +public: + uint32 count; + struct rpl_gtid *list; + + static const uint element_size= 4+4+8; + +#ifdef MYSQL_SERVER + Gtid_list_log_event(rpl_binlog_state *gtid_set); +#ifdef HAVE_REPLICATION + void pack_info(THD *thd, Protocol *protocol); +#endif +#else + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif + Gtid_list_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); + ~Gtid_list_log_event() { my_free(list); } + Log_event_type get_type_code() { return GTID_LIST_EVENT; } + int get_data_size() { return GTID_LIST_HEADER_LEN + count*element_size; } + bool is_valid() const { return list != NULL; } +#ifdef MYSQL_SERVER + bool write(IO_CACHE *file); +#endif + static bool peek(const char *event_start, uint32 event_len, + rpl_gtid **out_gtid_list, uint32 *out_list_len); +}; + + /* the classes below are for the new LOAD DATA INFILE logging */ /** diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index e9afe474418..698118e3bda 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1847,7 +1847,7 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli) Step the group log position if we are not in a transaction, otherwise increase the event log position. */ - rli->stmt_done(log_pos, when); + rli->stmt_done(log_pos, when, thd); /* Clear any errors in thd->net.last_err*. It is not known if this is needed or not. It is believed that any errors that may exist in diff --git a/sql/mdl.cc b/sql/mdl.cc index 7fd522d053a..e739a9aff78 100644 --- a/sql/mdl.cc +++ b/sql/mdl.cc @@ -85,7 +85,8 @@ const char *MDL_key::m_namespace_to_wait_state_name[NAMESPACE_END]= "Waiting for stored procedure metadata lock", "Waiting for trigger metadata lock", "Waiting for event metadata lock", - "Waiting for commit lock" + "Waiting for commit lock", + "User lock" /* Be compatible with old status. */ }; static bool mdl_initialized= 0; @@ -107,6 +108,7 @@ public: void init(); void destroy(); MDL_lock *find_or_insert(const MDL_key *key); + unsigned long get_lock_owner(const MDL_key *key); void remove(MDL_lock *lock); private: bool move_from_hash_to_lock_mutex(MDL_lock *lock); @@ -382,6 +384,7 @@ public: bool ignore_lock_priority) const; inline static MDL_lock *create(const MDL_key *key); + inline unsigned long get_lock_owner() const; void reschedule_waiters(); @@ -857,6 +860,43 @@ bool MDL_map::move_from_hash_to_lock_mutex(MDL_lock *lock) /** + * Return thread id of the owner of the lock, if it is owned. + */ + +unsigned long +MDL_map::get_lock_owner(const MDL_key *mdl_key) +{ + MDL_lock *lock; + unsigned long res= 0; + + if (mdl_key->mdl_namespace() == MDL_key::GLOBAL || + mdl_key->mdl_namespace() == MDL_key::COMMIT) + { + lock= (mdl_key->mdl_namespace() == MDL_key::GLOBAL) ? m_global_lock : + m_commit_lock; + mysql_prlock_rdlock(&lock->m_rwlock); + res= lock->get_lock_owner(); + mysql_prlock_unlock(&lock->m_rwlock); + } + else + { + my_hash_value_type hash_value= my_calc_hash(&m_locks, + mdl_key->ptr(), + mdl_key->length()); + mysql_mutex_lock(&m_mutex); + lock= (MDL_lock*) my_hash_search_using_hash_value(&m_locks, + hash_value, + mdl_key->ptr(), + mdl_key->length()); + if (lock) + res= lock->get_lock_owner(); + mysql_mutex_unlock(&m_mutex); + } + return res; +} + + +/** Destroy MDL_lock object or delegate this responsibility to whatever thread that holds the last outstanding reference to it. @@ -1621,6 +1661,23 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg, } +/** + Return thread id of the thread to which the first ticket was + granted. +*/ + +inline unsigned long +MDL_lock::get_lock_owner() const +{ + Ticket_iterator it(m_granted); + MDL_ticket *ticket; + + if ((ticket= it++)) + return thd_get_thread_id(ticket->get_ctx()->get_thd()); + return 0; +} + + /** Remove a ticket from waiting or pending queue and wakeup up waiters. */ void MDL_lock::remove_ticket(Ticket_list MDL_lock::*list, MDL_ticket *ticket) @@ -2094,31 +2151,37 @@ MDL_context::acquire_lock(MDL_request *mdl_request, ulong lock_wait_timeout) find_deadlock(); - if (lock->needs_notification(ticket)) + struct timespec abs_shortwait; + set_timespec(abs_shortwait, 1); + wait_status= MDL_wait::EMPTY; + + while (cmp_timespec(abs_shortwait, abs_timeout) <= 0) { - struct timespec abs_shortwait; - set_timespec(abs_shortwait, 1); - wait_status= MDL_wait::EMPTY; + /* abs_timeout is far away. Wait a short while and notify locks. */ + wait_status= m_wait.timed_wait(m_thd, &abs_shortwait, FALSE, + mdl_request->key.get_wait_state_name()); - while (cmp_timespec(abs_shortwait, abs_timeout) <= 0) + if (wait_status != MDL_wait::EMPTY) + break; + /* Check if the client is gone while we were waiting. */ + if (! thd_is_connected(m_thd)) { - /* abs_timeout is far away. Wait a short while and notify locks. */ - wait_status= m_wait.timed_wait(m_thd, &abs_shortwait, FALSE, - mdl_request->key.get_wait_state_name()); - - if (wait_status != MDL_wait::EMPTY) - break; + /* + * The client is disconnected. Don't wait forever: + * assume it's the same as a wait timeout, this + * ensures all error handling is correct. + */ + wait_status= MDL_wait::TIMEOUT; + break; + } - mysql_prlock_wrlock(&lock->m_rwlock); + mysql_prlock_wrlock(&lock->m_rwlock); + if (lock->needs_notification(ticket)) lock->notify_conflicting_locks(this); - mysql_prlock_unlock(&lock->m_rwlock); - set_timespec(abs_shortwait, 1); - } - if (wait_status == MDL_wait::EMPTY) - wait_status= m_wait.timed_wait(m_thd, &abs_timeout, TRUE, - mdl_request->key.get_wait_state_name()); + mysql_prlock_unlock(&lock->m_rwlock); + set_timespec(abs_shortwait, 1); } - else + if (wait_status == MDL_wait::EMPTY) wait_status= m_wait.timed_wait(m_thd, &abs_timeout, TRUE, mdl_request->key.get_wait_state_name()); @@ -2613,7 +2676,7 @@ void MDL_context::release_lock(MDL_ticket *ticket) the corresponding lists, i.e. stored in reverse temporal order. This allows to employ this function to: - back off in case of a lock conflict. - - release all locks in the end of a statment or transaction + - release all locks in the end of a statement or transaction - rollback to a savepoint. */ @@ -2725,6 +2788,22 @@ MDL_context::is_lock_owner(MDL_key::enum_mdl_namespace mdl_namespace, /** + Return thread id of the owner of the lock or 0 if + there is no owner. + @note: Lock type is not considered at all, the function + simply checks that there is some lock for the given key. + + @return thread id of the owner of the lock or 0 +*/ + +unsigned long +MDL_context::get_lock_owner(MDL_key *key) +{ + return mdl_locks.get_lock_owner(key); +} + + +/** Check if we have any pending locks which conflict with existing shared lock. @pre The ticket must match an acquired lock. @@ -2737,6 +2816,11 @@ bool MDL_ticket::has_pending_conflicting_lock() const return m_lock->has_pending_conflicting_lock(m_type); } +/** Return a key identifying this lock. */ +MDL_key *MDL_ticket::get_key() const +{ + return &m_lock->key; +} /** Releases metadata locks that were acquired after a specific savepoint. diff --git a/sql/mdl.h b/sql/mdl.h index 68f24a7a0e8..a86b45e180f 100644 --- a/sql/mdl.h +++ b/sql/mdl.h @@ -212,6 +212,7 @@ public: TRIGGER, EVENT, COMMIT, + USER_LOCK, /* user level locks. */ /* This should be the last ! */ NAMESPACE_END }; @@ -492,6 +493,7 @@ public: } enum_mdl_type get_type() const { return m_type; } MDL_lock *get_lock() const { return m_lock; } + MDL_key *get_key() const; void downgrade_exclusive_lock(enum_mdl_type type); bool has_stronger_or_equal_type(enum_mdl_type type) const; @@ -653,6 +655,7 @@ public: bool is_lock_owner(MDL_key::enum_mdl_namespace mdl_namespace, const char *db, const char *name, enum_mdl_type mdl_type); + unsigned long get_lock_owner(MDL_key *mdl_key); bool has_lock(const MDL_savepoint &mdl_savepoint, MDL_ticket *mdl_ticket); @@ -721,9 +724,9 @@ private: Lists of MDL tickets: --------------------- The entire set of locks acquired by a connection can be separated - in three subsets according to their: locks released at the end of - statement, at the end of transaction and locks are released - explicitly. + in three subsets according to their duration: locks released at + the end of statement, at the end of transaction and locks are + released explicitly. Statement and transactional locks are locks with automatic scope. They are accumulated in the course of a transaction, and released @@ -732,11 +735,12 @@ private: locks). They must not be (and never are) released manually, i.e. with release_lock() call. - Locks with explicit duration are taken for locks that span + Tickets with explicit duration are taken for locks that span multiple transactions or savepoints. These are: HANDLER SQL locks (HANDLER SQL is transaction-agnostic), LOCK TABLES locks (you can COMMIT/etc - under LOCK TABLES, and the locked tables stay locked), and + under LOCK TABLES, and the locked tables stay locked), user level + locks (GET_LOCK()/RELEASE_LOCK() functions) and locks implementing "global read lock". Statement/transactional locks are always prepended to the @@ -745,20 +749,19 @@ private: a savepoint, we start popping and releasing tickets from the front until we reach the last ticket acquired after the savepoint. - Locks with explicit duration stored are not stored in any + Locks with explicit duration are not stored in any particular order, and among each other can be split into - three sets: + four sets: - [LOCK TABLES locks] [HANDLER locks] [GLOBAL READ LOCK locks] + [LOCK TABLES locks] [USER locks] [HANDLER locks] [GLOBAL READ LOCK locks] The following is known about these sets: - * GLOBAL READ LOCK locks are always stored after LOCK TABLES - locks and after HANDLER locks. This is because one can't say - SET GLOBAL read_only=1 or FLUSH TABLES WITH READ LOCK - if one has locked tables. One can, however, LOCK TABLES - after having entered the read only mode. Note, that - subsequent LOCK TABLES statement will unlock the previous + * GLOBAL READ LOCK locks are always stored last. + This is because one can't say SET GLOBAL read_only=1 or + FLUSH TABLES WITH READ LOCK if one has locked tables. One can, + however, LOCK TABLES after having entered the read only mode. + Note, that subsequent LOCK TABLES statement will unlock the previous set of tables, but not the GRL! There are no HANDLER locks after GRL locks because SET GLOBAL read_only performs a FLUSH TABLES WITH @@ -853,6 +856,18 @@ extern bool mysql_notify_thread_having_shared_lock(THD *thd, THD *in_use, extern "C" const char* thd_enter_cond(MYSQL_THD thd, mysql_cond_t *cond, mysql_mutex_t *mutex, const char *msg); extern "C" void thd_exit_cond(MYSQL_THD thd, const char *old_msg); +extern "C" unsigned long thd_get_thread_id(const MYSQL_THD thd); + +/** + Check if a connection in question is no longer connected. + + @details + Replication apply thread is always connected. Otherwise, + does a poll on the associated socket to check if the client + is gone. +*/ +extern "C" int thd_is_connected(MYSQL_THD thd); + #ifndef DBUG_OFF extern mysql_mutex_t LOCK_open; diff --git a/sql/multi_range_read.cc b/sql/multi_range_read.cc index b9f49a83b4b..e3719600dff 100644 --- a/sql/multi_range_read.cc +++ b/sql/multi_range_read.cc @@ -1648,7 +1648,7 @@ int DsMrr_impl::dsmrr_explain_info(uint mrr_mode, char *str, size_t size) uint used_str_len= strlen(used_str); uint copy_len= min(used_str_len, size); - memcpy(str, used_str, size); + memcpy(str, used_str, copy_len); return copy_len; } return 0; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index c8d8c09cbba..f794dc6ffa5 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -676,6 +676,8 @@ mysql_mutex_t mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, LOCK_global_table_stats, LOCK_global_index_stats; +mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state; + /** The below lock protects access to two global server variables: max_prepared_stmt_count and prepared_stmt_count. These variables @@ -766,12 +768,15 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_thread_count, key_LOCK_thread_cache, key_PARTITION_LOCK_auto_inc; PSI_mutex_key key_RELAYLOG_LOCK_index; +PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_index_stats, key_LOCK_wakeup_ready; +PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state; + PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered; static PSI_mutex_info all_server_mutexes[]= @@ -815,6 +820,8 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0}, + { &key_LOCK_gtid_counter, "LOCK_gtid_counter", PSI_FLAG_GLOBAL}, + { &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL}, { &key_LOCK_thd_data, "THD::LOCK_thd_data", 0}, { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL}, { &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL}, @@ -835,7 +842,9 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOG_INFO_lock, "LOG_INFO::lock", 0}, { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL}, { &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL}, - { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0} + { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0}, + { &key_LOCK_slave_state, "LOCK_slave_state", 0}, + { &key_LOCK_binlog_state, "LOCK_binlog_state", 0} }; PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, @@ -959,6 +968,7 @@ PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest, key_file_trg, key_file_trn, key_file_init; PSI_file_key key_file_query_log, key_file_slow_log; PSI_file_key key_file_relaylog, key_file_relaylog_index; +PSI_file_key key_file_binlog_state; static PSI_file_info all_server_files[]= { @@ -989,7 +999,8 @@ static PSI_file_info all_server_files[]= { &key_file_tclog, "tclog", 0}, { &key_file_trg, "trigger_name", 0}, { &key_file_trn, "trigger", 0}, - { &key_file_init, "init", 0} + { &key_file_init, "init", 0}, + { &key_file_binlog_state, "binlog_state", 0} }; /** @@ -1282,6 +1293,12 @@ struct st_VioSSLFd *ssl_acceptor_fd; */ uint connection_count= 0, extra_connection_count= 0; +/** + Running counter for generating new GTIDs locally. +*/ +uint64 global_gtid_counter= 0; + + /* Function declarations */ pthread_handler_t signal_hand(void *arg); @@ -1776,6 +1793,7 @@ static void mysqld_exit(int exit_code) but if a kill -15 signal was sent, the signal thread did spawn the kill_server_thread thread, which is running concurrently. */ + rpl_deinit_gtid_slave_state(); wait_for_signal_thread_to_end(); mysql_audit_finalize(); clean_up_mutexes(); @@ -1824,7 +1842,7 @@ void clean_up(bool print_message) #endif query_cache_destroy(); hostname_cache_free(); - item_user_lock_free(); + item_func_sleep_free(); lex_free(); /* Free some memory */ item_create_cleanup(); if (!opt_noacl) @@ -1945,6 +1963,8 @@ static void clean_up_mutexes() mysql_mutex_destroy(&LOCK_global_user_client_stats); mysql_mutex_destroy(&LOCK_global_table_stats); mysql_mutex_destroy(&LOCK_global_index_stats); + mysql_mutex_destroy(&LOCK_gtid_counter); + mysql_mutex_destroy(&LOCK_rpl_gtid_state); #ifdef HAVE_OPENSSL mysql_mutex_destroy(&LOCK_des_key_file); #ifndef HAVE_YASSL @@ -2184,8 +2204,28 @@ static my_socket activate_tcp_port(uint port) for (a= ai; a != NULL; a= a->ai_next) { ip_sock= socket(a->ai_family, a->ai_socktype, a->ai_protocol); - if (ip_sock != INVALID_SOCKET) + + char ip_addr[INET6_ADDRSTRLEN]; + + if (vio_get_normalized_ip_string(a->ai_addr, a->ai_addrlen, + ip_addr, sizeof (ip_addr))) + { + ip_addr[0]= 0; + } + + if (ip_sock == INVALID_SOCKET) + { + sql_print_error("Failed to create a socket for %s '%s': errno: %d.", + (a->ai_family == AF_INET) ? "IPv4" : "IPv6", + (const char *) ip_addr, + (int) socket_errno); + } + else + { + sql_print_information("Server socket created on IP: '%s'.", + (const char *) ip_addr); break; + } } if (ip_sock == INVALID_SOCKET) @@ -4017,6 +4057,7 @@ static int init_thread_environment() mysql_mutex_init(key_LOCK_active_mi, &LOCK_active_mi, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_global_system_variables, &LOCK_global_system_variables, MY_MUTEX_INIT_FAST); + mysql_mutex_record_order(&LOCK_active_mi, &LOCK_global_system_variables); mysql_rwlock_init(key_rwlock_LOCK_system_variables_hash, &LOCK_system_variables_hash); mysql_mutex_init(key_LOCK_prepared_stmt_count, @@ -4034,6 +4075,10 @@ static int init_thread_environment() &LOCK_global_table_stats, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_global_index_stats, &LOCK_global_index_stats, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_gtid_counter, + &LOCK_gtid_counter, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_rpl_gtid_state, + &LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW); mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW); mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered, @@ -4078,6 +4123,10 @@ static int init_thread_environment() PTHREAD_CREATE_DETACHED); pthread_attr_setscope(&connection_attrib, PTHREAD_SCOPE_SYSTEM); +#ifdef HAVE_REPLICATION + rpl_init_gtid_slave_state(); +#endif + DBUG_RETURN(0); } @@ -4951,9 +5000,9 @@ int mysqld_main(int argc, char **argv) set_user(mysqld_user, user_info); } - if (opt_bin_log && !server_id) + if (opt_bin_log && !global_system_variables.server_id) { - server_id= 1; + global_system_variables.server_id= ::server_id= 1; #ifdef EXTRA_DEBUG sql_print_warning("You have enabled the binary log, but you haven't set " "server-id to a non-zero value: we force server id to 1; " @@ -6681,19 +6730,25 @@ static int show_rpl_status(THD *thd, SHOW_VAR *var, char *buff) static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff) { Master_info *mi; + bool tmp; + LINT_INIT(tmp); + var->type= SHOW_MY_BOOL; var->value= buff; + mysql_mutex_unlock(&LOCK_status); mysql_mutex_lock(&LOCK_active_mi); mi= master_info_index-> get_master_info(&thd->variables.default_master_connection, MYSQL_ERROR::WARN_LEVEL_NOTE); if (mi) - *((my_bool *)buff)= (my_bool) (mi->slave_running == - MYSQL_SLAVE_RUN_CONNECT && - mi->rli.slave_running); + tmp= (my_bool) (mi->slave_running == MYSQL_SLAVE_RUN_CONNECT && + mi->rli.slave_running); + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_status); + if (mi) + *((my_bool *)buff)= tmp; else var->type= SHOW_UNDEF; - mysql_mutex_unlock(&LOCK_active_mi); return 0; } @@ -6701,17 +6756,24 @@ static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff) static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff) { Master_info *mi; + longlong tmp; + LINT_INIT(tmp); + var->type= SHOW_LONGLONG; var->value= buff; + mysql_mutex_unlock(&LOCK_status); mysql_mutex_lock(&LOCK_active_mi); mi= master_info_index-> get_master_info(&thd->variables.default_master_connection, MYSQL_ERROR::WARN_LEVEL_NOTE); if (mi) - *((longlong *)buff)= mi->received_heartbeats; + tmp= mi->received_heartbeats; + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_status); + if (mi) + *((longlong *)buff)= tmp; else var->type= SHOW_UNDEF; - mysql_mutex_unlock(&LOCK_active_mi); return 0; } @@ -6719,17 +6781,24 @@ static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff) static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff) { Master_info *mi; + float tmp; + LINT_INIT(tmp); + var->type= SHOW_CHAR; var->value= buff; + mysql_mutex_unlock(&LOCK_status); mysql_mutex_lock(&LOCK_active_mi); mi= master_info_index-> get_master_info(&thd->variables.default_master_connection, MYSQL_ERROR::WARN_LEVEL_NOTE); if (mi) - sprintf(buff, "%.3f", mi->heartbeat_period); + tmp= mi->heartbeat_period; + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_status); + if (mi) + sprintf(buff, "%.3f", tmp); else var->type= SHOW_UNDEF; - mysql_mutex_unlock(&LOCK_active_mi); return 0; } @@ -7801,28 +7870,6 @@ mysqld_get_one_option(int optid, case (int) OPT_WANT_CORE: test_flags |= TEST_CORE_ON_SIGNAL; break; - case (int) OPT_BIND_ADDRESS: - { - struct addrinfo *res_lst, hints; - - bzero(&hints, sizeof(struct addrinfo)); - hints.ai_socktype= SOCK_STREAM; - hints.ai_protocol= IPPROTO_TCP; - - if (getaddrinfo(argument, NULL, &hints, &res_lst) != 0) - { - sql_print_error("Can't start server: cannot resolve hostname!"); - return 1; - } - - if (res_lst->ai_next) - { - sql_print_error("Can't start server: bind-address refers to multiple interfaces!"); - return 1; - } - freeaddrinfo(res_lst); - } - break; case OPT_CONSOLE: if (opt_console) opt_error_log= 0; // Force logs to stdout @@ -7832,6 +7879,7 @@ mysqld_get_one_option(int optid, break; case OPT_SERVER_ID: server_id_supplied = 1; + ::server_id= global_system_variables.server_id; break; case OPT_ONE_THREAD: thread_handling= SCHEDULER_NO_THREADS; diff --git a/sql/mysqld.h b/sql/mysqld.h index 587673de37c..c4b62c84603 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -247,11 +247,14 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; extern PSI_mutex_key key_RELAYLOG_LOCK_index; +extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; extern PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_index_stats, key_LOCK_wakeup_ready; +extern PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state; + extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock; @@ -291,6 +294,7 @@ extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest, key_file_trg, key_file_trn, key_file_init; extern PSI_file_key key_file_query_log, key_file_slow_log; extern PSI_file_key key_file_relaylog, key_file_relaylog_index; +extern PSI_file_key key_file_binlog_state; void init_server_psi_keys(); #endif /* HAVE_PSI_INTERFACE */ @@ -335,12 +339,13 @@ extern MYSQL_PLUGIN_IMPORT key_map key_map_full; /* Should be threaded Server mutex locks and condition variables. */ extern mysql_mutex_t - LOCK_user_locks, LOCK_status, + LOCK_item_func_sleep, LOCK_status, LOCK_error_log, LOCK_delayed_insert, LOCK_short_uuid_generator, LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone, LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_global_system_variables, LOCK_user_conn, LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count; +extern mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state; extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count; #ifdef HAVE_OPENSSL extern mysql_mutex_t LOCK_des_key_file; @@ -535,6 +540,7 @@ inline int set_current_thd(THD *thd) extern handlerton *maria_hton; extern uint extra_connection_count; +extern uint64 global_gtid_counter; extern my_bool opt_userstat_running, debug_assert_if_crashed_table; extern uint mysqld_extra_port; extern ulong opt_progress_report_time; diff --git a/sql/opt_range.cc b/sql/opt_range.cc index d101b2fe91b..5cadd26da6e 100644 --- a/sql/opt_range.cc +++ b/sql/opt_range.cc @@ -133,7 +133,12 @@ static int sel_cmp(Field *f,uchar *a,uchar *b,uint8 a_flag,uint8 b_flag); -static uchar is_null_string[2]= {1,0}; +/* + this should be long enough so that any memcmp with a string that + starts from '\0' won't cross is_null_string boundaries, even + if the memcmp is optimized to compare 4- 8- or 16- bytes at once +*/ +static uchar is_null_string[20]= {1,0}; class RANGE_OPT_PARAM; /* @@ -2001,7 +2006,7 @@ int QUICK_ROR_INTERSECT_SELECT::init() 1 error */ -int QUICK_RANGE_SELECT::init_ror_merged_scan(bool reuse_handler) +int QUICK_RANGE_SELECT::init_ror_merged_scan(bool reuse_handler, MEM_ROOT *alloc) { handler *save_file= file, *org_file; my_bool org_key_read; @@ -2029,7 +2034,7 @@ int QUICK_RANGE_SELECT::init_ror_merged_scan(bool reuse_handler) DBUG_RETURN(0); } - if (!(file= head->file->clone(head->s->normalized_path.str, thd->mem_root))) + if (!(file= head->file->clone(head->s->normalized_path.str, alloc))) { /* Manually set the error flag. Note: there seems to be quite a few @@ -2130,7 +2135,8 @@ failure: 0 OK other error code */ -int QUICK_ROR_INTERSECT_SELECT::init_ror_merged_scan(bool reuse_handler) +int QUICK_ROR_INTERSECT_SELECT::init_ror_merged_scan(bool reuse_handler, + MEM_ROOT *alloc) { List_iterator_fast<QUICK_SELECT_WITH_RECORD> quick_it(quick_selects); QUICK_SELECT_WITH_RECORD *cur; @@ -2147,7 +2153,7 @@ int QUICK_ROR_INTERSECT_SELECT::init_ror_merged_scan(bool reuse_handler) There is no use of this->file. Use it for the first of merged range selects. */ - int error= quick->init_ror_merged_scan(TRUE); + int error= quick->init_ror_merged_scan(TRUE, alloc); if (error) DBUG_RETURN(error); quick->file->extra(HA_EXTRA_KEYREAD_PRESERVE_FIELDS); @@ -2159,7 +2165,7 @@ int QUICK_ROR_INTERSECT_SELECT::init_ror_merged_scan(bool reuse_handler) const MY_BITMAP * const save_read_set= quick->head->read_set; const MY_BITMAP * const save_write_set= quick->head->write_set; #endif - if (quick->init_ror_merged_scan(FALSE)) + if (quick->init_ror_merged_scan(FALSE, alloc)) DBUG_RETURN(1); quick->file->extra(HA_EXTRA_KEYREAD_PRESERVE_FIELDS); @@ -2193,7 +2199,7 @@ int QUICK_ROR_INTERSECT_SELECT::init_ror_merged_scan(bool reuse_handler) int QUICK_ROR_INTERSECT_SELECT::reset() { DBUG_ENTER("QUICK_ROR_INTERSECT_SELECT::reset"); - if (!scans_inited && init_ror_merged_scan(TRUE)) + if (!scans_inited && init_ror_merged_scan(TRUE, &alloc)) DBUG_RETURN(1); scans_inited= TRUE; List_iterator_fast<QUICK_SELECT_WITH_RECORD> it(quick_selects); @@ -2330,7 +2336,7 @@ int QUICK_ROR_UNION_SELECT::reset() List_iterator_fast<QUICK_SELECT_I> it(quick_selects); while ((quick= it++)) { - if (quick->init_ror_merged_scan(FALSE)) + if (quick->init_ror_merged_scan(FALSE, &alloc)) DBUG_RETURN(1); } scans_inited= TRUE; @@ -7622,8 +7628,10 @@ static SEL_TREE *get_mm_tree(RANGE_OPT_PARAM *param,COND *cond) DBUG_RETURN(tree); } /* Here when simple cond */ - if (cond->const_item() && !cond->is_expensive()) + if (cond->const_item()) { + if (cond->is_expensive()) + DBUG_RETURN(0); /* During the cond->val_int() evaluation we can come across a subselect item which may allocate memory on the thd->mem_root and assumes diff --git a/sql/opt_range.h b/sql/opt_range.h index 484f508e49c..d98bf1186e8 100644 --- a/sql/opt_range.h +++ b/sql/opt_range.h @@ -323,7 +323,7 @@ public: 0 Ok other Error */ - virtual int init_ror_merged_scan(bool reuse_handler) + virtual int init_ror_merged_scan(bool reuse_handler, MEM_ROOT *alloc) { DBUG_ASSERT(0); return 1; } /* @@ -473,7 +473,7 @@ public: uchar *cur_prefix); bool reverse_sorted() { return 0; } bool unique_key_range(); - int init_ror_merged_scan(bool reuse_handler); + int init_ror_merged_scan(bool reuse_handler, MEM_ROOT *alloc); void save_last_pos() { file->position(record); } int get_type() { return QS_TYPE_RANGE; } @@ -722,7 +722,7 @@ public: #ifndef DBUG_OFF void dbug_dump(int indent, bool verbose); #endif - int init_ror_merged_scan(bool reuse_handler); + int init_ror_merged_scan(bool reuse_handler, MEM_ROOT *alloc); bool push_quick_back(MEM_ROOT *alloc, QUICK_RANGE_SELECT *quick_sel_range); class QUICK_SELECT_WITH_RECORD : public Sql_alloc diff --git a/sql/opt_subselect.cc b/sql/opt_subselect.cc index 31d3ad337d8..1dc15627627 100644 --- a/sql/opt_subselect.cc +++ b/sql/opt_subselect.cc @@ -1513,6 +1513,9 @@ static bool convert_subq_to_sj(JOIN *parent_join, Item_in_subselect *subq_pred) */ parent_lex->leaf_tables.concat(&subq_lex->leaf_tables); + if (subq_lex->options & OPTION_SCHEMA_TABLE) + parent_lex->options |= OPTION_SCHEMA_TABLE; + /* Same as above for next_local chain (a theory: a next_local chain always starts with ::leaf_tables @@ -1730,6 +1733,9 @@ static bool convert_subq_to_jtbm(JOIN *parent_join, */ parent_lex->leaf_tables.push_back(jtbm); + if (subq_pred->unit->first_select()->options & OPTION_SCHEMA_TABLE) + parent_lex->options |= OPTION_SCHEMA_TABLE; + /* Same as above for TABLE_LIST::next_local chain (a theory: a next_local chain always starts with ::leaf_tables diff --git a/sql/opt_sum.cc b/sql/opt_sum.cc index fa3a07b72c5..069fe6452e8 100644 --- a/sql/opt_sum.cc +++ b/sql/opt_sum.cc @@ -84,7 +84,7 @@ static ulonglong get_exact_record_count(List<TABLE_LIST> &tables) while ((tl= ti++)) { ha_rows tmp= tl->table->file->records(); - if ((tmp == HA_POS_ERROR)) + if (tmp == HA_POS_ERROR) return ULONGLONG_MAX; count*= tmp; } diff --git a/sql/partition_info.h b/sql/partition_info.h index e59d4ec8ba4..2fbdfff6636 100644 --- a/sql/partition_info.h +++ b/sql/partition_info.h @@ -306,6 +306,7 @@ private: char *create_default_partition_names(uint part_no, uint num_parts, uint start_no); char *create_subpartition_name(uint subpart_no, const char *part_name); +public: bool has_unique_name(partition_element *element); }; diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index 89fb1bb27de..35b43fd4305 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -89,14 +89,15 @@ void change_rpl_status(ulong from_status, ulong to_status) void unregister_slave(THD* thd, bool only_mine, bool need_mutex) { - if (thd->server_id) + uint32 thd_server_id= thd->variables.server_id; + if (thd_server_id) { if (need_mutex) mysql_mutex_lock(&LOCK_slave_list); SLAVE_INFO* old_si; if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list, - (uchar*)&thd->server_id, 4)) && + (uchar*)&thd_server_id, 4)) && (!only_mine || old_si->thd == thd)) my_hash_delete(&slave_list, (uchar*)old_si); @@ -127,7 +128,7 @@ int register_slave(THD* thd, uchar* packet, uint packet_length) if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) goto err2; - thd->server_id= si->server_id= uint4korr(p); + thd->variables.server_id= si->server_id= uint4korr(p); p+= 4; get_object(p,si->host, "Failed to register slave: too long 'report-host'"); get_object(p,si->user, "Failed to register slave: too long 'report-user'"); @@ -145,7 +146,7 @@ int register_slave(THD* thd, uchar* packet, uint packet_length) // si->rpl_recovery_rank= uint4korr(p); p += 4; if (!(si->master_id= uint4korr(p))) - si->master_id= server_id; + si->master_id= global_system_variables.server_id; si->thd= thd; mysql_mutex_lock(&LOCK_slave_list); diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc new file mode 100644 index 00000000000..d6a6ed90bd3 --- /dev/null +++ b/sql/rpl_gtid.cc @@ -0,0 +1,1122 @@ +/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +/* Definitions for MariaDB global transaction ID (GTID). */ + + +#include "sql_priv.h" +#include "my_sys.h" +#include "unireg.h" +#include "my_global.h" +#include "sql_base.h" +#include "sql_parse.h" +#include "key.h" +#include "rpl_gtid.h" +#include "rpl_rli.h" + + +const LEX_STRING rpl_gtid_slave_state_table_name= + { C_STRING_WITH_LEN("rpl_slave_state") }; + + +void +rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) +{ + int err; + /* + Add the gtid to the HASH in the replication slave state. + + We must do this only _after_ commit, so that for parallel replication, + there will not be an attempt to delete the corresponding table row before + it is even committed. + */ + lock(); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no); + unlock(); + if (err) + { + sql_print_warning("Slave: Out of memory during slave state maintenance. " + "Some no longer necessary rows in table " + "mysql.%s may be left undeleted.", + rpl_gtid_slave_state_table_name.str); + /* + Such failure is not fatal. We will fail to delete the row for this + GTID, but it will do no harm and will be removed automatically on next + server restart. + */ + } +} + + +int +rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli) +{ + uint64 sub_id; + + /* + Update the GTID position, if we have it and did not already update + it in a GTID transaction. + */ + if ((sub_id= rli->gtid_sub_id)) + { + rli->gtid_sub_id= 0; + if (record_gtid(thd, &rli->current_gtid, sub_id, false)) + return 1; + update_state_hash(sub_id, &rli->current_gtid); + } + return 0; +} + + +rpl_slave_state::rpl_slave_state() + : inited(false), loaded(false) +{ + my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); +} + + +rpl_slave_state::~rpl_slave_state() +{ +} + + +void +rpl_slave_state::init() +{ + DBUG_ASSERT(!inited); + mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); + inited= true; +} + + +void +rpl_slave_state::truncate_hash() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + list_element *l= e->list; + list_element *next; + while (l) + { + next= l->next; + my_free(l); + l= next; + } + /* The element itself is freed by the hash element free function. */ + } + my_hash_reset(&hash); +} + +void +rpl_slave_state::deinit() +{ + if (!inited) + return; + truncate_hash(); + my_hash_free(&hash); + mysql_mutex_destroy(&LOCK_slave_state); +} + + +int +rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, + uint64 seq_no) +{ + element *elem= NULL; + list_element *list_elem= NULL; + + if (!(elem= get_element(domain_id))) + return 1; + + if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) + return 1; + list_elem->server_id= server_id; + list_elem->sub_id= sub_id; + list_elem->seq_no= seq_no; + + elem->add(list_elem); + return 0; +} + + +struct rpl_slave_state::element * +rpl_slave_state::get_element(uint32 domain_id) +{ + struct element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); + if (elem) + return elem; + + if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)))) + return NULL; + elem->list= NULL; + elem->last_sub_id= 0; + elem->domain_id= domain_id; + if (my_hash_insert(&hash, (uchar *)elem)) + { + my_free(elem); + return NULL; + } + return elem; +} + + +int +rpl_slave_state::truncate_state_table(THD *thd) +{ + TABLE_LIST tlist; + int err= 0; + TABLE *table; + + mysql_reset_thd_for_next_command(thd, 0); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_WRITE); + if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + { + table= tlist.table; + table->no_replicate= 1; + err= table->file->ha_truncate(); + + if (err) + { + ha_rollback_trans(thd, FALSE); + close_thread_tables(thd); + ha_rollback_trans(thd, TRUE); + } + else + { + ha_commit_trans(thd, FALSE); + close_thread_tables(thd); + ha_commit_trans(thd, TRUE); + } + thd->mdl_context.release_transactional_locks(); + } + + return err; +} + + +static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= { + { { C_STRING_WITH_LEN("domain_id") }, + { C_STRING_WITH_LEN("int(10) unsigned") }, + {NULL, 0} }, + { { C_STRING_WITH_LEN("sub_id") }, + { C_STRING_WITH_LEN("bigint(20) unsigned") }, + {NULL, 0} }, + { { C_STRING_WITH_LEN("server_id") }, + { C_STRING_WITH_LEN("int(10) unsigned") }, + {NULL, 0} }, + { { C_STRING_WITH_LEN("seq_no") }, + { C_STRING_WITH_LEN("bigint(20) unsigned") }, + {NULL, 0} }, +}; + +static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1}; + +static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= { + array_elements(mysql_rpl_slave_state_coltypes), + mysql_rpl_slave_state_coltypes, + array_elements(mysql_rpl_slave_state_pk_parts), + mysql_rpl_slave_state_pk_parts +}; + +class Gtid_db_intact : public Table_check_intact +{ +protected: + void report_error(uint, const char *fmt, ...) + { + va_list args; + va_start(args, fmt); + error_log_print(ERROR_LEVEL, fmt, args); + va_end(args); + } +}; + +static Gtid_db_intact gtid_table_intact; + +/* + Check that the mysql.rpl_slave_state table has the correct definition. +*/ +int +gtid_check_rpl_slave_state_table(TABLE *table) +{ + int err; + + if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef))) + my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql", + rpl_gtid_slave_state_table_name.str); + return err; +} + + +/* + Write a gtid to the replication slave state table. + + Do it as part of the transaction, to get slave crash safety, or as a separate + transaction if !in_transaction (eg. MyISAM or DDL). + + gtid The global transaction id for this event group. + sub_id Value allocated within the sub_id when the event group was + read (sub_id must be consistent with commit order in master binlog). + + Note that caller must later ensure that the new gtid and sub_id is inserted + into the appropriate HASH element with rpl_slave_state.add(), so that it can + be deleted later. But this must only be done after COMMIT if in transaction. +*/ +int +rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, + bool in_transaction) +{ + TABLE_LIST tlist; + int err= 0; + bool table_opened= false; + TABLE *table; + list_element *elist= 0, *next; + element *elem; + ulonglong thd_saved_option= thd->variables.option_bits; + Query_tables_list lex_backup; + + mysql_reset_thd_for_next_command(thd, 0); + + DBUG_EXECUTE_IF("gtid_inject_record_gtid", + { + my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0)); + return 1; + } ); + + thd->lex->reset_n_backup_query_tables_list(&lex_backup); + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_WRITE); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + + table->no_replicate= 1; + if (!in_transaction) + thd->variables.option_bits&= + ~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN); + + bitmap_set_all(table->write_set); + + table->field[0]->store((ulonglong)gtid->domain_id, true); + table->field[1]->store(sub_id, true); + table->field[2]->store((ulonglong)gtid->server_id, true); + table->field[3]->store(gtid->seq_no, true); + DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE();); + if ((err= table->file->ha_write_row(table->record[0]))) + goto end; + + lock(); + if ((elem= get_element(gtid->domain_id)) == NULL) + { + unlock(); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + err= 1; + goto end; + } + elist= elem->grab_list(); + unlock(); + + if (!elist) + goto end; + + /* Now delete any already committed rows. */ + bitmap_set_bit(table->read_set, table->field[0]->field_index); + bitmap_set_bit(table->read_set, table->field[1]->field_index); + + if ((err= table->file->ha_index_init(0, 0))) + goto end; + while (elist) + { + uchar key_buffer[4+8]; + + next= elist->next; + + table->field[1]->store(elist->sub_id, true); + /* domain_id is already set in table->record[0] from write_row() above. */ + key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); + if ((err= table->file->ha_index_read_map(table->record[1], key_buffer, + HA_WHOLE_KEY, HA_READ_KEY_EXACT)) || + (err= table->file->ha_delete_row(table->record[1]))) + break; + my_free(elist); + elist= next; + } + table->file->ha_index_end(); + + mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no); + +end: + + if (table_opened) + { + if (err) + { + /* + ToDo: If error, we need to put any remaining elist back into the HASH so + we can do another delete attempt later. + */ + ha_rollback_trans(thd, FALSE); + close_thread_tables(thd); + } + else + { + ha_commit_trans(thd, FALSE); + close_thread_tables(thd); + } + if (in_transaction) + thd->mdl_context.release_statement_locks(); + else + thd->mdl_context.release_transactional_locks(); + } + thd->lex->restore_backup_query_tables_list(&lex_backup); + thd->variables.option_bits= thd_saved_option; + return err; +} + + +uint64 +rpl_slave_state::next_subid(uint32 domain_id) +{ + uint32 sub_id= 0; + element *elem; + + lock(); + elem= get_element(domain_id); + if (elem) + sub_id= ++elem->last_sub_id; + unlock(); + + return sub_id; +} + + +bool +rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first) +{ + if (*first) + *first= false; + else + if (dest->append(",",1)) + return true; + return + dest->append_ulonglong(gtid->domain_id) || + dest->append("-",1) || + dest->append_ulonglong(gtid->server_id) || + dest->append("-",1) || + dest->append_ulonglong(gtid->seq_no); +} + + +/* + Prepare the current slave state as a string, suitable for sending to the + master to request to receive binlog events starting from that GTID state. + + The state consists of the most recently applied GTID for each domain_id, + ie. the one with the highest sub_id within each domain_id. + + Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when + a server was previously a master and now needs to connect to a new master as + a slave. For each domain_id, if the GTID in the binlog was logged with our + own server_id _and_ has a higher seq_no than what is in the slave state, + then this should be used as the position to start replicating at. This + allows to promote a slave as new master, and connect the old master as a + slave with MASTER_GTID_POS=AUTO. +*/ + +int +rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) +{ + bool first= true; + uint32 i; + HASH gtid_hash; + uchar *rec; + rpl_gtid *gtid; + int res= 1; + + my_hash_init(>id_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, NULL, HASH_UNIQUE); + for (i= 0; i < num_extra; ++i) + if (extra_gtids[i].server_id == global_system_variables.server_id && + my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i]))) + goto err; + + lock(); + + for (i= 0; i < hash.records; ++i) + { + uint64 best_sub_id; + rpl_gtid best_gtid; + element *e= (element *)my_hash_element(&hash, i); + list_element *l= e->list; + + if (!l) + continue; /* Nothing here */ + + best_gtid.domain_id= e->domain_id; + best_gtid.server_id= l->server_id; + best_gtid.seq_no= l->seq_no; + best_sub_id= l->sub_id; + while ((l= l->next)) + { + if (l->sub_id > best_sub_id) + { + best_sub_id= l->sub_id; + best_gtid.server_id= l->server_id; + best_gtid.seq_no= l->seq_no; + } + } + + /* Check if we have something newer in the extra list. */ + rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id, 0); + if (rec) + { + gtid= (rpl_gtid *)rec; + if (gtid->seq_no > best_gtid.seq_no) + memcpy(&best_gtid, gtid, sizeof(best_gtid)); + if (my_hash_delete(>id_hash, rec)) + { + unlock(); + goto err; + } + } + + if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first)) + { + unlock(); + goto err; + } + } + + unlock(); + + /* Also add any remaining extra domain_ids. */ + for (i= 0; i < gtid_hash.records; ++i) + { + gtid= (rpl_gtid *)my_hash_element(>id_hash, i); + if (rpl_slave_state_tostring_helper(dest, gtid, &first)) + goto err; + } + + res= 0; + +err: + my_hash_free(>id_hash); + + return res; +} + + +/* + Lookup a domain_id in the current replication slave state. + + Returns false if the domain_id has no entries in the slave state. + Otherwise returns true, and fills in out_gtid with the corresponding + GTID. +*/ +bool +rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) +{ + element *elem; + list_element *list; + uint64 best_sub_id; + + lock(); + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); + if (!elem || !(list= elem->list)) + { + unlock(); + return false; + } + + out_gtid->domain_id= domain_id; + out_gtid->server_id= list->server_id; + out_gtid->seq_no= list->seq_no; + best_sub_id= list->sub_id; + + while ((list= list->next)) + { + if (best_sub_id > list->sub_id) + continue; + best_sub_id= list->sub_id; + out_gtid->server_id= list->server_id; + out_gtid->seq_no= list->seq_no; + } + + unlock(); + return true; +} + + +/* + Parse a GTID at the start of a string, and update the pointer to point + at the first character after the parsed GTID. + + GTID can be in short form with domain_id=0 implied, SERVERID-SEQNO. + Or long form, DOMAINID-SERVERID-SEQNO. + + Returns 0 on ok, non-zero on parse error. +*/ +static int +gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid) +{ + char *q; + char *p= *ptr; + uint64 v1, v2, v3; + int err= 0; + + q= end; + v1= (uint64)my_strtoll10(p, &q, &err); + if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-') + return 1; + p= q+1; + q= end; + v2= (uint64)my_strtoll10(p, &q, &err); + if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-') + return 1; + p= q+1; + q= end; + v3= (uint64)my_strtoll10(p, &q, &err); + if (err != 0) + return 1; + + out_gtid->domain_id= v1; + out_gtid->server_id= v2; + out_gtid->seq_no= v3; + *ptr= q; + return 0; +} + + +/* + Update the slave replication state with the GTID position obtained from + master when connecting with old-style (filename,offset) position. + + If RESET is true then all existing entries are removed. Otherwise only + domain_ids mentioned in the STATE_FROM_MASTER are changed. + + Returns 0 if ok, non-zero if error. +*/ +int +rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, + bool reset) +{ + char *end= state_from_master + len; + + if (reset) + { + if (truncate_state_table(thd)) + return 1; + truncate_hash(); + } + if (state_from_master == end) + return 0; + for (;;) + { + rpl_gtid gtid; + uint64 sub_id; + + if (gtid_parser_helper(&state_from_master, end, >id) || + !(sub_id= next_subid(gtid.domain_id)) || + record_gtid(thd, >id, sub_id, false) || + update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no)) + return 1; + if (state_from_master == end) + break; + if (*state_from_master != ',') + return 1; + ++state_from_master; + } + return 0; +} + + +bool +rpl_slave_state::is_empty() +{ + uint32 i; + bool result= true; + + lock(); + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (e->list) + { + result= false; + break; + } + } + unlock(); + + return result; +} + + +rpl_binlog_state::rpl_binlog_state() +{ + my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); + mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, + MY_MUTEX_INIT_SLOW); +} + + +void +rpl_binlog_state::reset() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + my_hash_free(&((element *)my_hash_element(&hash, i))->hash); + my_hash_reset(&hash); +} + +rpl_binlog_state::~rpl_binlog_state() +{ + reset(); + my_hash_free(&hash); + mysql_mutex_destroy(&LOCK_binlog_state); +} + + +/* + Update replication state with a new GTID. + + If the (domain_id, server_id) pair already exists, then the new GTID replaces + the old one for that domain id. Else a new entry is inserted. + + Returns 0 for ok, 1 for error. +*/ +int +rpl_binlog_state::update(const struct rpl_gtid *gtid) +{ + rpl_gtid *lookup_gtid; + element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)(>id->domain_id), 0); + if (elem) + { + /* + By far the most common case is that successive events within same + replication domain have the same server id (it changes only when + switching to a new master). So save a hash lookup in this case. + */ + if (likely(elem->last_gtid->server_id == gtid->server_id)) + { + elem->last_gtid->seq_no= gtid->seq_no; + return 0; + } + + lookup_gtid= (rpl_gtid *) + my_hash_search(&elem->hash, (const uchar *)>id->server_id, 0); + if (lookup_gtid) + { + lookup_gtid->seq_no= gtid->seq_no; + elem->last_gtid= lookup_gtid; + return 0; + } + + /* Allocate a new GTID and insert it. */ + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (!lookup_gtid) + return 1; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) + { + my_free(lookup_gtid); + return 1; + } + elem->last_gtid= lookup_gtid; + return 0; + } + + /* First time we see this domain_id; allocate a new element. */ + elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)); + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (elem && lookup_gtid) + { + elem->domain_id= gtid->domain_id; + my_hash_init(&elem->hash, &my_charset_bin, 32, + offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); + elem->last_gtid= lookup_gtid; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) + { + lookup_gtid= NULL; /* Do not free. */ + if (0 == my_hash_insert(&hash, (const uchar *)elem)) + return 0; + } + my_hash_free(&elem->hash); + } + + /* An error. */ + if (elem) + my_free(elem); + if (lookup_gtid) + my_free(lookup_gtid); + return 1; +} + + +uint64 +rpl_binlog_state::seq_no_from_state() +{ + ulong i, j; + uint64 seq_no= 0; + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j < e->hash.records; ++j) + { + const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid->seq_no > seq_no) + seq_no= gtid->seq_no; + } + } + return seq_no; +} + + +/* + Write binlog state to text file, so we can read it in again without having + to scan last binlog file (normal shutdown/startup, not crash recovery). + + The most recent GTID within each domain_id is written after any other GTID + within this domain. +*/ +int +rpl_binlog_state::write_to_iocache(IO_CACHE *dest) +{ + ulong i, j; + char buf[21]; + + for (i= 0; i < hash.records; ++i) + { + size_t res; + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + longlong10_to_str(gtid->seq_no, buf, 10); + res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf); + if (res == (size_t) -1) + return 1; + } + } + + return 0; +} + + +int +rpl_binlog_state::read_from_iocache(IO_CACHE *src) +{ + /* 10-digit - 10-digit - 20-digit \n \0 */ + char buf[10+1+10+1+20+1+1]; + char *p, *end; + rpl_gtid gtid; + + reset(); + for (;;) + { + size_t res= my_b_gets(src, buf, sizeof(buf)); + if (!res) + break; + p= buf; + end= buf + res; + if (gtid_parser_helper(&p, end, >id)) + return 1; + if (update(>id)) + return 1; + } + return 0; +} + + +rpl_gtid * +rpl_binlog_state::find(uint32 domain_id, uint32 server_id) +{ + element *elem; + if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0))) + return NULL; + return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0); +} + + +uint32 +rpl_binlog_state::count() +{ + uint32 c= 0; + uint32 i; + + for (i= 0; i < hash.records; ++i) + c+= ((element *)my_hash_element(&hash, i))->hash.records; + + return c; +} + + +int +rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) +{ + uint32 i, j, pos; + + pos= 0; + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + if (pos >= list_size) + return 1; + memcpy(>id_list[pos++], gtid, sizeof(*gtid)); + } + } + + return 0; +} + + +/* + Get a list of the most recently binlogged GTID, for each domain_id. + + This can be used when switching from being a master to being a slave, + to know where to start replicating from the new master. + + The returned list must be de-allocated with my_free(). + + Returns 0 for ok, non-zero for out-of-memory. +*/ +int +rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) +{ + uint32 i; + + *size= hash.records; + if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME)))) + return 1; + for (i= 0; i < *size; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid)); + } + + return 0; +} + + +slave_connection_state::slave_connection_state() +{ + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); +} + + +slave_connection_state::~slave_connection_state() +{ + my_hash_free(&hash); +} + + +/* + Create a hash from the slave GTID state that is sent to master when slave + connects to start replication. + + The state is sent as <GTID>,<GTID>,...,<GTID>, for example: + + 0-2-112,1-4-1022 + + The state gives for each domain_id the GTID to start replication from for + the corresponding replication stream. So domain_id must be unique. + + Returns 0 if ok, non-zero if error due to malformed input. + + Note that input string is built by slave server, so it will not be incorrect + unless bug/corruption/malicious server. So we just need basic sanity check, + not fancy user-friendly error message. +*/ + +int +slave_connection_state::load(char *slave_request, size_t len) +{ + char *p, *end; + uchar *rec; + rpl_gtid *gtid; + const rpl_gtid *gtid2; + + my_hash_reset(&hash); + p= slave_request; + end= slave_request + len; + if (p == end) + return 0; + for (;;) + { + if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*gtid)); + return 1; + } + gtid= (rpl_gtid *)rec; + if (gtid_parser_helper(&p, end, gtid)) + { + my_free(rec); + my_error(ER_INCORRECT_GTID_STATE, MYF(0)); + return 1; + } + if ((gtid2= (const rpl_gtid *) + my_hash_search(&hash, (const uchar *)(>id->domain_id), 0))) + { + my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id, + gtid->server_id, (ulonglong)gtid->seq_no, gtid2->domain_id, + gtid2->server_id, (ulonglong)gtid2->seq_no, gtid->domain_id); + my_free(rec); + return 1; + } + if (my_hash_insert(&hash, rec)) + { + my_free(rec); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + if (p == end) + break; /* Finished. */ + if (*p != ',') + { + my_error(ER_INCORRECT_GTID_STATE, MYF(0)); + return 1; + } + ++p; + } + + return 0; +} + + +int +slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count) +{ + uint32 i; + + my_hash_reset(&hash); + for (i= 0; i < count; ++i) + if (update(>id_list[i])) + return 1; + return 0; +} + + +rpl_gtid * +slave_connection_state::find(uint32 domain_id) +{ + return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0); +} + + +int +slave_connection_state::update(const rpl_gtid *in_gtid) +{ + rpl_gtid *new_gtid; + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); + if (rec) + { + memcpy(rec, in_gtid, sizeof(*in_gtid)); + return 0; + } + + if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME)))) + return 1; + memcpy(new_gtid, in_gtid, sizeof(*new_gtid)); + if (my_hash_insert(&hash, (uchar *)new_gtid)) + { + my_free(new_gtid); + return 1; + } + + return 0; +} + + +void +slave_connection_state::remove(const rpl_gtid *in_gtid) +{ + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); +#ifndef DBUG_OFF + bool err; + rpl_gtid *slave_gtid= (rpl_gtid *)rec; + DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */); + DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id); + DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no); +#endif + + IF_DBUG(err=, ) + my_hash_delete(&hash, rec); + DBUG_ASSERT(!err); +} + + +int +slave_connection_state::to_string(String *out_str) +{ + uint32 i; + bool first; + + out_str->length(0); + first= true; + for (i= 0; i < hash.records; ++i) + { + const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i); + if (rpl_slave_state_tostring_helper(out_str, gtid, &first)) + return 1; + } + return 0; +} diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h new file mode 100644 index 00000000000..e63d8439803 --- /dev/null +++ b/sql/rpl_gtid.h @@ -0,0 +1,179 @@ +/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef RPL_GTID_H +#define RPL_GTID_H + +/* Definitions for MariaDB global transaction ID (GTID). */ + + +extern const LEX_STRING rpl_gtid_slave_state_table_name; + +class String; + +struct rpl_gtid +{ + uint32 domain_id; + uint32 server_id; + uint64 seq_no; +}; + + +enum enum_gtid_skip_type { + GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION +}; + + +/* + Replication slave state. + + For every independent replication stream (identified by domain_id), this + remembers the last gtid applied on the slave within this domain. + + Since events are always committed in-order within a single domain, this is + sufficient to maintain the state of the replication slave. +*/ +struct rpl_slave_state +{ + /* Elements in the list of GTIDs kept for each domain_id. */ + struct list_element + { + struct list_element *next; + uint64 sub_id; + uint64 seq_no; + uint32 server_id; + }; + + /* Elements in the HASH that hold the state for one domain_id. */ + struct element + { + struct list_element *list; + uint64 last_sub_id; + uint32 domain_id; + + list_element *grab_list() { list_element *l= list; list= NULL; return l; } + void add(list_element *l) + { + l->next= list; + list= l; + if (last_sub_id < l->sub_id) + last_sub_id= l->sub_id; + } + }; + + /* Mapping from domain_id to its element. */ + HASH hash; + /* Mutex protecting access to the state. */ + mysql_mutex_t LOCK_slave_state; + + bool inited; + bool loaded; + + rpl_slave_state(); + ~rpl_slave_state(); + + void init(); + void deinit(); + void truncate_hash(); + ulong count() const { return hash.records; } + int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no); + int truncate_state_table(THD *thd); + int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, + bool in_transaction); + uint64 next_subid(uint32 domain_id); + int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra); + bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid); + int load(THD *thd, char *state_from_master, size_t len, bool reset); + bool is_empty(); + + void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); } + void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); } + + element *get_element(uint32 domain_id); + + void update_state_hash(uint64 sub_id, rpl_gtid *gtid); + int record_and_update_gtid(THD *thd, Relay_log_info *rli); +}; + + +/* + Binlog state. + This keeps the last GTID written to the binlog for every distinct + (domain_id, server_id) pair. + This will be logged at the start of the next binlog file as a + Gtid_list_log_event; this way, it is easy to find the binlog file + containing a gigen GTID, by simply scanning backwards from the newest + one until a lower seq_no is found in the Gtid_list_log_event at the + start of a binlog for the given domain_id and server_id. + + We also remember the last logged GTID for every domain_id. This is used + to know where to start when a master is changed to a slave. As a side + effect, it also allows to skip a hash lookup in the very common case of + logging a new GTID with same server id as last GTID. +*/ +struct rpl_binlog_state +{ + struct element { + uint32 domain_id; + HASH hash; /* Containing all server_id for one domain_id */ + /* The most recent entry in the hash. */ + rpl_gtid *last_gtid; + }; + /* Mapping from domain_id to collection of elements. */ + HASH hash; + /* Mutex protecting access to the state. */ + mysql_mutex_t LOCK_binlog_state; + + rpl_binlog_state(); + ~rpl_binlog_state(); + + void reset(); + int update(const struct rpl_gtid *gtid); + uint64 seq_no_from_state(); + int write_to_iocache(IO_CACHE *dest); + int read_from_iocache(IO_CACHE *src); + uint32 count(); + int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); + int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); + rpl_gtid *find(uint32 domain_id, uint32 server_id); +}; + + +/* + Represent the GTID state that a slave connection to a master requests + the master to start sending binlog events from. +*/ +struct slave_connection_state +{ + /* Mapping from domain_id to the GTID requested for that domain. */ + HASH hash; + + slave_connection_state(); + ~slave_connection_state(); + + int load(char *slave_request, size_t len); + int load(const rpl_gtid *gtid_list, uint32 count); + rpl_gtid *find(uint32 domain_id); + int update(const rpl_gtid *in_gtid); + void remove(const rpl_gtid *gtid); + ulong count() const { return hash.records; } + int to_string(String *out_str); +}; + +extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, + bool *first); +extern int gtid_check_rpl_slave_state_table(TABLE *table); + +#endif /* RPL_GTID_H */ diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index 1d21b3f9445..2777dabf451 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -176,7 +176,7 @@ void delegates_destroy() plugins add to thd->lex will be automatically unlocked. */ #define FOREACH_OBSERVER(r, f, thd, args) \ - param.server_id= thd->server_id; \ + param.server_id= thd->variables.server_id; \ /* Use a struct to make sure that they are allocated adjacent, check delete_dynamic(). @@ -348,7 +348,7 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, ulong hlen; Binlog_transmit_param param; param.flags= flags; - param.server_id= thd->server_id; + param.server_id= thd->variables.server_id; int ret= 0; read_lock(); diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc index ec1a96e8a2b..a4b04d2e047 100644 --- a/sql/rpl_injector.cc +++ b/sql/rpl_injector.cc @@ -108,7 +108,7 @@ int injector::transaction::use_table(server_id_type sid, table tbl) if ((error= check_state(TABLE_STATE))) DBUG_RETURN(error); - server_id_type save_id= m_thd->server_id; + server_id_type save_id= m_thd->variables.server_id; m_thd->set_server_id(sid); error= m_thd->binlog_write_table_map(tbl.get_table(), tbl.is_transactional()); @@ -127,7 +127,7 @@ int injector::transaction::write_row (server_id_type sid, table tbl, if (error) DBUG_RETURN(error); - server_id_type save_id= m_thd->server_id; + server_id_type save_id= m_thd->variables.server_id; m_thd->set_server_id(sid); error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(), cols, colcnt, record); @@ -146,7 +146,7 @@ int injector::transaction::delete_row(server_id_type sid, table tbl, if (error) DBUG_RETURN(error); - server_id_type save_id= m_thd->server_id; + server_id_type save_id= m_thd->variables.server_id; m_thd->set_server_id(sid); error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(), cols, colcnt, record); @@ -165,7 +165,7 @@ int injector::transaction::update_row(server_id_type sid, table tbl, if (error) DBUG_RETURN(error); - server_id_type save_id= m_thd->server_id; + server_id_type save_id= m_thd->variables.server_id; m_thd->set_server_id(sid); error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(), cols, colcnt, before, after); diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 5348a94b35b..fdff61fafec 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -37,7 +37,7 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF), connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0), slave_running(0), slave_run_id(0), sync_counter(0), - heartbeat_period(0), received_heartbeats(0), master_id(0) + heartbeat_period(0), received_heartbeats(0), master_id(0), using_gtid(0) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -61,7 +61,9 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, my_casedn_str(system_charset_info, cmp_connection_name.str); } - my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16, MYF(0)); + my_init_dynamic_array(&ignore_server_ids, + sizeof(global_system_variables.server_id), 16, 16, + MYF(0)); bzero((char*) &file, sizeof(file)); mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST); @@ -141,6 +143,7 @@ void init_master_log_pos(Master_info* mi) mi->master_log_name[0] = 0; mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number + mi->using_gtid= false; /* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; @@ -170,8 +173,13 @@ enum { LINE_FOR_MASTER_BIND = 17, /* 6.0 added value of master_ignore_server_id */ LINE_FOR_REPLICATE_IGNORE_SERVER_IDS= 18, - /* Number of lines currently used when saving master info file */ - LINES_IN_MASTER_INFO= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS + /* MySQL 5.6 fixed-position lines. */ + LINE_FOR_FIRST_MYSQL_5_6=19, + LINE_FOR_LAST_MYSQL_5_6=23, + /* Reserved lines for MySQL future versions. */ + LINE_FOR_LAST_MYSQL_FUTURE=33, + /* Number of (fixed-position) lines used when saving master info file */ + LINES_IN_MASTER_INFO= LINE_FOR_LAST_MYSQL_FUTURE }; int init_master_info(Master_info* mi, const char* master_info_fname, @@ -299,7 +307,7 @@ file '%s')", fname); int ssl= 0, ssl_verify_server_cert= 0; float master_heartbeat_period= 0.0; char *first_non_digit; - char dummy_buf[HOSTNAME_LENGTH+1]; + char buf[HOSTNAME_LENGTH+1]; /* Starting from 4.1.x master.info has new format. Now its @@ -393,7 +401,7 @@ file '%s')", fname); (this is just a reservation to avoid future upgrade problems) */ if (lines >= LINE_FOR_MASTER_BIND && - init_strvar_from_file(dummy_buf, sizeof(dummy_buf), &mi->file, "")) + init_strvar_from_file(buf, sizeof(buf), &mi->file, "")) goto errwithmsg; /* Starting from 6.0 list of server_id of ignorable servers might be @@ -405,6 +413,34 @@ file '%s')", fname); sql_print_error("Failed to initialize master info ignore_server_ids"); goto errwithmsg; } + + /* + Starting with MariaDB 10.0, we use a key=value syntax, which is nicer + in several ways. But we leave a bunch of empty lines to accomodate + any future old-style additions in MySQL (this will make it easier for + users moving from MariaDB to MySQL, to not have MySQL try to + interpret a MariaDB key=value line.) + */ + if (lines >= LINE_FOR_LAST_MYSQL_FUTURE) + { + uint i; + /* Skip lines used by / reserved for MySQL >= 5.6. */ + for (i= LINE_FOR_FIRST_MYSQL_5_6; i <= LINE_FOR_LAST_MYSQL_FUTURE; ++i) + { + if (init_strvar_from_file(buf, sizeof(buf), &mi->file, "")) + goto errwithmsg; + } + + /* + Parse any extra key=value lines. + Ignore unknown lines, to facilitate downgrades. + */ + while (!init_strvar_from_file(buf, sizeof(buf), &mi->file, 0)) + { + if (0 == strncmp(buf, STRING_WITH_LEN("using_gtid="))) + mi->using_gtid= (0 != atoi(buf + sizeof("using_gtid"))); + } + } } #ifndef HAVE_OPENSSL @@ -510,7 +546,7 @@ int flush_master_info(Master_info* mi, char* ignore_server_ids_buf; { ignore_server_ids_buf= - (char *) my_malloc((sizeof(::server_id) * 3 + 1) * + (char *) my_malloc((sizeof(global_system_variables.server_id) * 3 + 1) * (1 + mi->ignore_server_ids.elements), MYF(MY_WME)); if (!ignore_server_ids_buf) DBUG_RETURN(1); @@ -544,14 +580,16 @@ int flush_master_info(Master_info* mi, sprintf(heartbeat_buf, "%.3f", mi->heartbeat_period); my_b_seek(file, 0L); my_b_printf(file, - "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n", + "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n" + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + "using_gtid=%d\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, mi->password, mi->port, mi->connect_retry, (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert, mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert, - heartbeat_buf, "", ignore_server_ids_buf); + heartbeat_buf, "", ignore_server_ids_buf, mi->using_gtid); my_free(ignore_server_ids_buf); err= flush_io_cache(file); if (sync_masterinfo_period && !err && @@ -639,7 +677,7 @@ bool check_master_connection_name(LEX_STRING *name) file names without a prefix. */ -void create_logfile_name_with_suffix(char *res_file_name, uint length, +void create_logfile_name_with_suffix(char *res_file_name, size_t length, const char *info_file, bool append, LEX_STRING *suffix) { diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index d52b2992afd..b6a3e7d91b9 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -126,6 +126,11 @@ class Master_info : public Slave_reporting_capability ulonglong received_heartbeats; // counter of received heartbeat events DYNAMIC_ARRAY ignore_server_ids; ulong master_id; + /* + True if slave position is set using GTID state rather than old-style + file/offset binlog position. + */ + bool using_gtid; }; int init_master_info(Master_info* mi, const char* master_info_fname, const char* slave_info_fname, @@ -170,7 +175,7 @@ public: }; bool check_master_connection_name(LEX_STRING *name); -void create_logfile_name_with_suffix(char *res_file_name, uint length, +void create_logfile_name_with_suffix(char *res_file_name, size_t length, const char *info_file, bool append, LEX_STRING *suffix); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index d5cd6a3efed..6d53e6c3187 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -31,6 +31,13 @@ static int count_relay_log_space(Relay_log_info* rli); +/** + Current replication state (hash of last GTID executed, per replication + domain). +*/ +rpl_slave_state rpl_global_gtid_slave_state; + + // Defined in slave.cc int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, @@ -51,7 +58,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) abort_pos_wait(0), slave_run_id(0), sql_thd(0), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), - tables_to_lock(0), tables_to_lock_count(0), + gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0), last_event_start_time(0), deferred_events(NULL),m_flags(0), row_stmt_start_timestamp(0), long_find_row_note_printed(false), m_annotate_event(0) @@ -1091,7 +1098,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) if (until_condition == UNTIL_MASTER_POS) { - if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id) + if (ev && ev->server_id == (uint32) global_system_variables.server_id && + !replicate_same_server_id) DBUG_RETURN(FALSE); log_name= group_master_log_name; log_pos= (!ev)? group_master_log_pos : @@ -1189,7 +1197,7 @@ bool Relay_log_info::cached_charset_compare(char *charset) const void Relay_log_info::stmt_done(my_off_t event_master_log_pos, - time_t event_creation_time) + time_t event_creation_time, THD *thd) { #ifndef DBUG_OFF extern uint debug_not_change_ts_if_art_event; @@ -1224,7 +1232,23 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, else { inc_group_relay_log_pos(event_master_log_pos); + if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, this)) + { + report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, + "Failed to update GTID state in %s.%s, slave state may become " + "inconsistent: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + /* + At this point we are not in a transaction (for example after DDL), + so we can not roll back. Anyway, normally updates to the slave + state table should not fail, and if they do, at least we made the + DBA aware of the problem in the error log. + */ + } + DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); flush_relay_log_info(this); + DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); /* Note that Rotate_log_event::do_apply_event() does not call this function, so there is no chance that a fake rotate event resets @@ -1356,4 +1380,139 @@ void Relay_log_info::slave_close_thread_tables(THD *thd) clear_tables_to_lock(); DBUG_VOID_RETURN; } + + +int +rpl_load_gtid_slave_state(THD *thd) +{ + TABLE_LIST tlist; + TABLE *table; + bool table_opened= false; + bool table_scanned= false; + struct local_element { uint64 sub_id; rpl_gtid gtid; }; + struct local_element *entry; + HASH hash; + int err= 0; + uint32 i; + uint64 highest_seq_no= 0; + DBUG_ENTER("rpl_load_gtid_slave_state"); + + rpl_global_gtid_slave_state.lock(); + bool loaded= rpl_global_gtid_slave_state.loaded; + rpl_global_gtid_slave_state.unlock(); + if (loaded) + DBUG_RETURN(0); + + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); + + mysql_reset_thd_for_next_command(thd, 0); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_READ); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + + bitmap_set_all(table->read_set); + if ((err= table->file->ha_rnd_init_with_error(1))) + goto end; + table_scanned= true; + for (;;) + { + uint32 domain_id, server_id; + uint64 sub_id, seq_no; + uchar *rec; + + if ((err= table->file->ha_rnd_next(table->record[0]))) + { + if (err == HA_ERR_RECORD_DELETED) + continue; + else if (err == HA_ERR_END_OF_FILE) + break; + else + goto end; + } + domain_id= (ulonglong)table->field[0]->val_int(); + sub_id= (ulonglong)table->field[1]->val_int(); + server_id= (ulonglong)table->field[2]->val_int(); + seq_no= (ulonglong)table->field[3]->val_int(); + DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n", + (unsigned)domain_id, (unsigned)server_id, + (ulong)seq_no, (ulong)sub_id)); + if (seq_no > highest_seq_no) + highest_seq_no= seq_no; + + if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0))) + { + entry= (struct local_element *)rec; + if (entry->sub_id >= sub_id) + continue; + entry->sub_id= sub_id; + DBUG_ASSERT(entry->gtid.domain_id == domain_id); + entry->gtid.server_id= server_id; + entry->gtid.seq_no= seq_no; + } + else + { + if (!(entry= (struct local_element *)my_malloc(sizeof(*entry), + MYF(MY_WME)))) + { + err= 1; + goto end; + } + entry->sub_id= sub_id; + entry->gtid.domain_id= domain_id; + entry->gtid.server_id= server_id; + entry->gtid.seq_no= seq_no; + if ((err= my_hash_insert(&hash, (uchar *)entry))) + { + my_free(entry); + goto end; + } + } + } + + rpl_global_gtid_slave_state.lock(); + for (i= 0; i < hash.records; ++i) + { + entry= (struct local_element *)my_hash_element(&hash, i); + if ((err= rpl_global_gtid_slave_state.update(entry->gtid.domain_id, + entry->gtid.server_id, + entry->sub_id, + entry->gtid.seq_no))) + { + rpl_global_gtid_slave_state.unlock(); + goto end; + } + } + rpl_global_gtid_slave_state.loaded= true; + rpl_global_gtid_slave_state.unlock(); + + err= 0; /* Clear HA_ERR_END_OF_FILE */ + +end: + if (table_scanned) + { + table->file->ha_index_or_rnd_end(); + ha_commit_trans(thd, FALSE); + ha_commit_trans(thd, TRUE); + } + if (table_opened) + { + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + } + my_hash_free(&hash); + mysql_bin_log.bump_seq_no_counter_if_needed(highest_seq_no); + DBUG_RETURN(err); +} + #endif diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 6144d37026b..7aff6720aac 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -307,6 +307,14 @@ public: char slave_patternload_file[FN_REFLEN]; size_t slave_patternload_file_size; + /* + Current GTID being processed. + The sub_id gives the binlog order within one domain_id. A zero sub_id + means that there is no active GTID. + */ + uint64 gtid_sub_id; + rpl_gtid current_gtid; + Relay_log_info(bool is_slave_recovery); ~Relay_log_info(); @@ -445,7 +453,7 @@ public: the <code>Seconds_behind_master</code> field. */ void stmt_done(my_off_t event_log_pos, - time_t event_creation_time); + time_t event_creation_time, THD *thd); /** @@ -584,4 +592,8 @@ private: int init_relay_log_info(Relay_log_info* rli, const char* info_fname); +extern struct rpl_slave_state rpl_global_gtid_slave_state; + +int rpl_load_gtid_slave_state(THD *thd); + #endif /* RPL_RLI_H */ diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index e0b16b7a1ed..5dbc7637552 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6524,4 +6524,22 @@ ER_SLAVE_STOPPED eng "SLAVE '%.*s' stopped" ER_SQL_DISCOVER_ERROR eng "Engine %s failed to discover table %`-.192s.%`-.192s with '%s'" - +ER_FAILED_GTID_STATE_INIT + eng "Failed initializing replication GTID state" +ER_INCORRECT_GTID_STATE + eng "Could not parse GTID list for GTID_POS" +ER_CANNOT_UPDATE_GTID_STATE + eng "Could not update replication slave gtid state" +ER_DUPLICATE_GTID_DOMAIN + eng "GTID %u-%u-%llu and %u-%u-%llu conflict (duplicate domain id %u)" +ER_GTID_OPEN_TABLE_FAILED + eng "Failed to open %s.%s" + ger "Öffnen von %s.%s fehlgeschlagen" +ER_GTID_POSITION_NOT_FOUND_IN_BINLOG + eng "Connecting slave requested to start from GTID %u-%u-%llu, which is not in the master's binlog" +ER_CANNOT_LOAD_SLAVE_GTID_STATE + eng "Failed to load replication slave GTID state from table %s.%s" +ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG + eng "Requested GTID_POS %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog" +ER_MASTER_GTID_POS_MISSING_DOMAIN + eng "Requested GTID_POS contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog" diff --git a/sql/slave.cc b/sql/slave.cc index e2a4d229ab9..8f1d8669770 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -162,8 +162,10 @@ static int terminate_slave_thread(THD *thd, volatile uint *slave_running, bool skip_lock); static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info); -static bool send_show_master_info_header(THD *thd, bool full); -static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full); +static bool send_show_master_info_header(THD *thd, bool full, + size_t gtid_pos_length); +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, + String *gtid_pos); /* Find out which replications threads are running @@ -395,6 +397,7 @@ int init_recovery(Master_info* mi, const char** errmsg) DBUG_RETURN(0); } + /** Convert slave skip errors bitmap into a printable string. @@ -706,7 +709,7 @@ int start_slave_thread( if (start_lock) mysql_mutex_lock(start_lock); - if (!server_id) + if (!global_system_variables.server_id) { if (start_cond) mysql_cond_broadcast(start_cond); @@ -782,6 +785,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0; mysql_cond_t* cond_io=0, *cond_sql=0; int error=0; + const char *errmsg; DBUG_ENTER("start_slave_threads"); if (need_slave_mutex) @@ -797,6 +801,22 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, lock_cond_sql = &mi->rli.run_lock; } + /* + If we are using GTID and both SQL and IO threads are stopped, then get + rid of all relay logs. + + Relay logs are not very useful when using GTID, except as a buffer + between the fetch in the IO thread and the apply in SQL thread. However + while one of the threads is running, they are in use and cannot be + removed. + */ + if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running) + { + purge_relay_logs(&mi->rli, NULL, 0, &errmsg); + mi->master_log_name[0]= 0; + mi->master_log_pos= 0; + } + if (thread_mask & SLAVE_IO) error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE @@ -1407,7 +1427,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) (master_res= mysql_store_result(mysql)) && (master_row= mysql_fetch_row(master_res))) { - if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) && + if ((global_system_variables.server_id == + (mi->master_id= strtoul(master_row[1], 0, 10))) && !mi->rli.replicate_same_server_id) { errmsg= "The slave I/O thread stops because master and slave have equal \ @@ -1787,6 +1808,133 @@ past_checksum: after_set_capability: #endif + /* + Request dump start from slave replication GTID state. + + Only request GTID position the first time we connect after CHANGE MASTER + or after starting both IO or SQL thread. + + Otherwise, if the IO thread was ahead of the SQL thread before the + restart or reconnect, we might end up re-fetching and hence re-applying + the same event(s) again. + */ + if (mi->using_gtid && !mi->master_log_name[0]) + { + int rc; + char str_buf[256]; + String connect_state(str_buf, sizeof(str_buf), system_charset_info); + connect_state.length(0); + + /* + Read the master @@GLOBAL.gtid_domain_id variable. + This is mostly to check that master is GTID aware, but we could later + perhaps use it to check that different multi-source masters are correctly + configured with distinct domain_id. + */ + if (mysql_real_query(mysql, + STRING_WITH_LEN("SELECT @@GLOBAL.gtid_domain_id")) || + !(master_res= mysql_store_result(mysql)) || + !(master_row= mysql_fetch_row(master_res))) + { + err_code= mysql_errno(mysql); + errmsg= "The slave I/O thread stops because master does not support " + "MariaDB global transaction id. A fatal error is encountered when " + "it tries to SELECT @@GLOBAL.gtid_domain_id."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + mysql_free_result(master_res); + master_res= NULL; + + connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + if (rpl_append_gtid_state(&connect_state, true)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_connect_state."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + connect_state.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_connect_state failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_connect_state."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } + if (!mi->using_gtid) + { + /* + If we are not using GTID to connect this time, then instead request + the corresponding GTID position from the master, so that the user + can reconnect the next time using MASTER_GTID_POS=AUTO. + */ + char quote_buf[2*sizeof(mi->master_log_name)+1]; + char str_buf[28+2*sizeof(mi->master_log_name)+10]; + String query(str_buf, sizeof(str_buf), system_charset_info); + query.length(0); + + query.append("SELECT binlog_gtid_pos('"); + escape_quotes_for_mysql(&my_charset_bin, quote_buf, sizeof(quote_buf), + mi->master_log_name, strlen(mi->master_log_name)); + query.append(quote_buf); + query.append("',"); + query.append_ulonglong(mi->master_log_pos); + query.append(")"); + + if (!mysql_real_query(mysql, query.c_ptr_safe(), query.length()) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res)) && + (master_row[0] != NULL)) + { + rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0], + strlen(master_row[0]), false); + } + else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), + "Get master GTID position failed with error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + /* + ToDo: If the master does not have the binlog_gtid_pos() function, it + just means that it is an old master with no GTID support, so we should + do nothing. + + However, if binlog_gtid_pos() exists, but fails or returns NULL, then + it means that the requested position is not valid. We could use this + to catch attempts to replicate from within the middle of an event, + avoiding strange failures or possible corruption. + */ + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + } + err: if (errmsg) { @@ -1980,7 +2128,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, DBUG_RETURN(0); } - int4store(pos, server_id); pos+= 4; + int4store(pos, global_system_variables.server_id); pos+= 4; pos= net_store_data(pos, (uchar*) report_host, report_host_len); pos= net_store_data(pos, (uchar*) report_user, report_user_len); pos= net_store_data(pos, (uchar*) report_password, report_password_len); @@ -2029,16 +2177,20 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, bool show_master_info(THD *thd, Master_info *mi, bool full) { DBUG_ENTER("show_master_info"); + String gtid_pos; - if (send_show_master_info_header(thd, full)) + if (full && rpl_global_gtid_slave_state.tostring(>id_pos, NULL, 0)) + DBUG_RETURN(TRUE); + if (send_show_master_info_header(thd, full, gtid_pos.length())) DBUG_RETURN(TRUE); - if (send_show_master_info_data(thd, mi, full)) + if (send_show_master_info_data(thd, mi, full, >id_pos)) DBUG_RETURN(TRUE); my_eof(thd); DBUG_RETURN(FALSE); } -static bool send_show_master_info_header(THD *thd, bool full) +static bool send_show_master_info_header(THD *thd, bool full, + size_t gtid_pos_length) { List<Item> field_list; Protocol *protocol= thd->protocol; @@ -2117,6 +2269,8 @@ static bool send_show_master_info_header(THD *thd, bool full) FN_REFLEN)); field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong), MYSQL_TYPE_LONG)); + field_list.push_back(new Item_return_int("Using_Gtid", sizeof(ulong), + MYSQL_TYPE_LONG)); if (full) { field_list.push_back(new Item_return_int("Retried_transactions", @@ -2129,6 +2283,7 @@ static bool send_show_master_info_header(THD *thd, bool full) 10, MYSQL_TYPE_LONG)); field_list.push_back(new Item_float("Slave_heartbeat_period", 0.0, 3, 10)); + field_list.push_back(new Item_empty_string("Gtid_Pos", gtid_pos_length)); } if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) @@ -2137,7 +2292,8 @@ static bool send_show_master_info_header(THD *thd, bool full) } -static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full) +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, + String *gtid_pos) { DBUG_ENTER("send_show_master_info_data"); @@ -2292,6 +2448,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full) } // Master_Server_id protocol->store((uint32) mi->master_id); + protocol->store((uint32) (mi->using_gtid != 0)); if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -2299,6 +2456,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full) protocol->store((uint32) mi->rli.executed_entries); protocol->store((uint32) mi->received_heartbeats); protocol->store((double) mi->heartbeat_period, 3, &tmp); + protocol->store(gtid_pos->ptr(), gtid_pos->length(), &my_charset_bin); } mysql_mutex_unlock(&mi->rli.err_lock); @@ -2341,11 +2499,19 @@ static int cmp_mi_by_name(const Master_info **arg1, bool show_all_master_info(THD* thd) { uint i, elements; + String gtid_pos; Master_info **tmp; DBUG_ENTER("show_master_info"); mysql_mutex_assert_owner(&LOCK_active_mi); - if (send_show_master_info_header(thd, 1)) + gtid_pos.length(0); + if (rpl_append_gtid_state(>id_pos, true)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + DBUG_RETURN(TRUE); + } + + if (send_show_master_info_header(thd, 1, gtid_pos.length())) DBUG_RETURN(TRUE); if (!(elements= master_info_index->master_info_hash.records)) @@ -2367,7 +2533,7 @@ bool show_all_master_info(THD* thd) for (i= 0; i < elements; i++) { - if (send_show_master_info_data(thd, tmp[i], 1)) + if (send_show_master_info_data(thd, tmp[i], 1, >id_pos)) DBUG_RETURN(TRUE); } @@ -2533,7 +2699,7 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); - int4store(buf + 6, server_id); + int4store(buf + 6, global_system_variables.server_id); len = (uint) strlen(logname); memcpy(buf + 10, logname,len); if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) @@ -2742,7 +2908,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) has a Rotate etc). */ - thd->server_id = ev->server_id; // use the original server id for logging + /* Use the original server id for logging. */ + thd->variables.server_id = ev->server_id; thd->set_time(); // time the query thd->lex->current_select= 0; if (!ev->when) @@ -3526,17 +3693,34 @@ err_during_init: /* Check the temporary directory used by commands like LOAD DATA INFILE. + + As the directory never changes during a mysqld run, we only + test this once and cache the result. This also resolve a race condition + when this can be run by multiple threads at the same time. */ + +static bool check_temp_dir_run= 0; +static int check_temp_dir_result= 0; + static int check_temp_dir(char* tmp_file) { - int fd; + File fd; + int result= 1; // Assume failure MY_DIR *dirp; char tmp_dir[FN_REFLEN]; size_t tmp_dir_size; DBUG_ENTER("check_temp_dir"); + mysql_mutex_lock(&LOCK_thread_count); + if (check_temp_dir_run) + { + result= check_temp_dir_result; + goto end; + } + check_temp_dir_run= 1; + /* Get the directory from the temporary file. */ @@ -3546,27 +3730,33 @@ int check_temp_dir(char* tmp_file) Check if the directory exists. */ if (!(dirp=my_dir(tmp_dir,MYF(MY_WME)))) - DBUG_RETURN(1); + goto end; my_dirend(dirp); /* - Check permissions to create a file. + Check permissions to create a file. We use O_TRUNC to ensure that + things works even if we happen to have and old file laying around. */ if ((fd= mysql_file_create(key_file_misc, tmp_file, CREATE_MODE, - O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, + O_WRONLY | O_BINARY | O_TRUNC | O_NOFOLLOW, MYF(MY_WME))) < 0) - DBUG_RETURN(1); + goto end; + result= 0; // Directory name ok /* Clean up. */ mysql_file_close(fd, MYF(0)); mysql_file_delete(key_file_misc, tmp_file, MYF(0)); - DBUG_RETURN(0); +end: + check_temp_dir_result= result; + mysql_mutex_unlock(&LOCK_thread_count); + DBUG_RETURN(result); } + /** Slave SQL thread entry point. @@ -3728,6 +3918,15 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, goto err; } + /* Load the set of seen GTIDs, if we did not already. */ + if (rpl_load_gtid_slave_state(thd)) + { + rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), + "Unable to load replication GTID slave state from mysql.%s: %s", + rpl_gtid_slave_state_table_name.str, thd->stmt_da->message()); + goto err; + } + /* execute init_slave variable */ if (opt_init_slave.length) { @@ -3953,7 +4152,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) } DBUG_ASSERT(cev->inited_from_old); thd->file_id = cev->file_id = mi->file_id++; - thd->server_id = cev->server_id; + thd->variables.server_id = cev->server_id; cev_not_written = 1; if (unlikely(net_request_file(net,cev->fname))) @@ -4565,16 +4764,18 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Heartbeat is sent only after an event corresponding to the corrdinates the heartbeat carries. - Slave can not have a difference in coordinates except in the only + Slave can not have a higher coordinate except in the only special case when mi->master_log_name, master_log_pos have never been updated by Rotate event i.e when slave does not have any history with the master (and thereafter mi->master_log_pos is NULL). + Slave can have lower coordinates, if some event from master was omitted. + TODO: handling `when' for SHOW SLAVE STATUS' snds behind */ if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) && mi->master_log_name != NULL) - || mi->master_log_pos != hb.log_pos) + || mi->master_log_pos > hb.log_pos) { /* missed events of heartbeat from the past */ error= ER_SLAVE_HEARTBEAT_FAILURE; @@ -4626,7 +4827,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_lock(log_lock); s_id= uint4korr(buf + SERVER_ID_OFFSET); - if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) || + if ((s_id == global_system_variables.server_id && + !mi->rli.replicate_same_server_id) || /* the following conjunction deals with IGNORE_SERVER_IDS, if set If the master is on the ignore list, execution of @@ -4657,7 +4859,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) IGNORE_SERVER_IDS it increments mi->master_log_pos as well as rli->group_relay_log_pos. */ - if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) || + if (!(s_id == global_system_variables.server_id && + !mi->rli.replicate_same_server_id) || (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) @@ -5191,6 +5394,27 @@ static Log_event* next_event(Relay_log_info* rli) inc_event_relay_log_pos() */ rli->future_event_relay_log_pos= my_b_tell(cur_log); + /* + For GTID, allocate a new sub_id for the given domain_id. + The sub_id must be allocated in increasing order of binlog order. + */ + if (ev->get_type_code() == GTID_EVENT) + { + Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev); + uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id); + if (!sub_id) + { + errmsg = "slave SQL thread aborted because of out-of-memory error"; + if (hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } + rli->gtid_sub_id= sub_id; + rli->current_gtid.server_id= gev->server_id; + rli->current_gtid.domain_id= gev->domain_id; + rli->current_gtid.seq_no= gev->seq_no; + } + if (hot_log) mysql_mutex_unlock(log_lock); DBUG_RETURN(ev); diff --git a/sql/sp_head.cc b/sql/sp_head.cc index cb9c0325845..f0a87673857 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -2434,7 +2434,7 @@ sp_head::fill_field_definition(THD *thd, LEX *lex, lex->charset ? lex->charset : thd->variables.collation_database, lex->uint_geom_type, - lex->vcol_info, NULL)) + lex->vcol_info, NULL, FALSE)) return TRUE; if (field_def->interval_list.elements) diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 72a9e69b042..dd92c3d6750 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -345,7 +345,7 @@ uint create_tmp_table_def_key(THD *thd, char *key, const char *db, const char *table_name) { uint key_length= create_table_def_key(key, db, table_name); - int4store(key + key_length, thd->server_id); + int4store(key + key_length, thd->variables.server_id); int4store(key + key_length + 4, thd->variables.pseudo_thread_id); key_length+= TMP_TABLE_KEY_EXTRA; return key_length; @@ -388,6 +388,14 @@ bool table_def_init(void) init_tdc_psi_keys(); #endif mysql_mutex_init(key_LOCK_open, &LOCK_open, MY_MUTEX_INIT_FAST); + mysql_mutex_record_order(&LOCK_active_mi, &LOCK_open); + /* + When we delete from the table_def_cache(), the free function + table_def_free_entry() is invoked from my_hash_delete(), which calls + free_table_share(), which may unload plugins, which can remove status + variables and hence takes LOCK_status. Record this locking order here. + */ + mysql_mutex_record_order(&LOCK_open, &LOCK_status); oldest_unused_share= &end_of_unused_share; end_of_unused_share.prev= &oldest_unused_share; @@ -2662,7 +2670,7 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root, { DBUG_PRINT("error", ("query_id: %lu server_id: %u pseudo_thread_id: %lu", - (ulong) table->query_id, (uint) thd->server_id, + (ulong) table->query_id, (uint) thd->variables.server_id, (ulong) thd->variables.pseudo_thread_id)); my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr()); DBUG_RETURN(TRUE); @@ -5939,7 +5947,8 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton, ("table: '%s'.'%s' path: '%s' server_id: %u " "pseudo_thread_id: %lu", db, table_name, path, - (uint) thd->server_id, (ulong) thd->variables.pseudo_thread_id)); + (uint) thd->variables.server_id, + (ulong) thd->variables.pseudo_thread_id)); /* Create the cache_key for temporary tables */ key_length= create_tmp_table_def_key(thd, cache_key, db, table_name); @@ -9343,7 +9352,6 @@ int init_ftfuncs(THD *thd, SELECT_LEX *select_lex, bool no_order) List_iterator<Item_func_match> li(*(select_lex->ftfunc_list)); Item_func_match *ifm; DBUG_PRINT("info",("Performing FULLTEXT search")); - thd_proc_info(thd, "FULLTEXT initialization"); while ((ifm=li++)) ifm->init_search(no_order); diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 95aa4a00122..413f921a16f 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -112,7 +112,8 @@ Key::Key(const Key &rhs, MEM_ROOT *mem_root) columns(rhs.columns, mem_root), name(rhs.name), option_list(rhs.option_list), - generated(rhs.generated) + generated(rhs.generated), + create_if_not_exists(rhs.create_if_not_exists) { list_copy_and_replace_each_value(columns, mem_root); } @@ -827,6 +828,7 @@ THD::THD() col_access=0; is_slave_error= thread_specific_used= FALSE; my_hash_clear(&handler_tables_hash); + my_hash_clear(&ull_hash); tmp_table=0; cuted_fields= 0L; sent_row_count= 0L; @@ -866,7 +868,6 @@ THD::THD() net.vio=0; net.buff= 0; client_capabilities= 0; // minimalistic client - ull=0; system_thread= NON_SYSTEM_THREAD; cleanup_done= abort_on_warning= 0; peer_port= 0; // For SHOW PROCESSLIST @@ -891,7 +892,7 @@ THD::THD() /* Variables with default values */ proc_info="login"; where= THD::DEFAULT_WHERE; - server_id = ::server_id; + variables.server_id = global_system_variables.server_id; slave_net = 0; command=COM_CONNECT; *scramble= '\0'; @@ -1400,8 +1401,6 @@ void THD::cleanup(void) if (global_read_lock.is_acquired()) global_read_lock.unlock_global_read_lock(this); - /* All metadata locks must have been released by now. */ - DBUG_ASSERT(!mdl_context.has_locks()); if (user_connect) { decrease_user_connections(user_connect); @@ -1419,13 +1418,9 @@ void THD::cleanup(void) sp_cache_clear(&sp_proc_cache); sp_cache_clear(&sp_func_cache); - if (ull) - { - mysql_mutex_lock(&LOCK_user_locks); - item_user_lock_release(ull); - mysql_mutex_unlock(&LOCK_user_locks); - ull= NULL; - } + mysql_ull_cleanup(this); + /* All metadata locks must have been released by now. */ + DBUG_ASSERT(!mdl_context.has_locks()); apc_target.destroy(); cleanup_done=1; @@ -4001,6 +3996,15 @@ extern "C" unsigned long thd_get_thread_id(const MYSQL_THD thd) } +/** + Check if THD socket is still connected. + */ +extern "C" int thd_is_connected(MYSQL_THD thd) +{ + return thd->is_connected(); +} + + #ifdef INNODB_COMPATIBILITY_HOOKS extern "C" const struct charset_info_st *thd_charset(MYSQL_THD thd) { @@ -4321,6 +4325,8 @@ void THD::leave_locked_tables_mode() /* Also ensure that we don't release metadata locks for open HANDLERs. */ if (handler_tables_hash.records) mysql_ha_set_explicit_lock_duration(this); + if (ull_hash.records) + mysql_ull_set_explicit_lock_duration(this); } locked_tables_mode= LTM_NONE; } @@ -5097,7 +5103,7 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, size_t const len= pack_row(table, cols, row_data, record); Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, + binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, len, is_trans, static_cast<Write_rows_log_event*>(0)); @@ -5141,7 +5147,7 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, #endif Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, + binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, before_size + after_size, is_trans, static_cast<Update_rows_log_event*>(0)); @@ -5172,7 +5178,7 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, size_t const len= pack_row(table, cols, row_data, record); Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, + binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt, len, is_trans, static_cast<Delete_rows_log_event*>(0)); diff --git a/sql/sql_class.h b/sql/sql_class.h index 7372d2a27c6..221c0d3bd51 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -57,7 +57,6 @@ class Lex_input_stream; class Parser_state; class Rows_log_event; class Sroutine_hash_entry; -class User_level_lock; class user_var_entry; enum enum_enable_or_disable { LEAVE_AS_IS, ENABLE, DISABLE }; @@ -223,8 +222,9 @@ public: enum drop_type {KEY, COLUMN }; const char *name; enum drop_type type; - Alter_drop(enum drop_type par_type,const char *par_name) - :name(par_name), type(par_type) {} + bool drop_if_exists; + Alter_drop(enum drop_type par_type,const char *par_name, bool par_exists) + :name(par_name), type(par_type), drop_if_exists(par_exists) {} /** Used to make a clone of this object for ALTER/CREATE TABLE @sa comment for Key_part_spec::clone @@ -258,20 +258,23 @@ public: LEX_STRING name; engine_option_value *option_list; bool generated; + bool create_if_not_exists; Key(enum Keytype type_par, const LEX_STRING &name_arg, KEY_CREATE_INFO *key_info_arg, bool generated_arg, List<Key_part_spec> &cols, - engine_option_value *create_opt) + engine_option_value *create_opt, bool if_not_exists_opt) :type(type_par), key_create_info(*key_info_arg), columns(cols), - name(name_arg), option_list(create_opt), generated(generated_arg) + name(name_arg), option_list(create_opt), generated(generated_arg), + create_if_not_exists(if_not_exists_opt) {} Key(enum Keytype type_par, const char *name_arg, size_t name_len_arg, KEY_CREATE_INFO *key_info_arg, bool generated_arg, List<Key_part_spec> &cols, - engine_option_value *create_opt) + engine_option_value *create_opt, bool if_not_exists_opt) :type(type_par), key_create_info(*key_info_arg), columns(cols), - option_list(create_opt), generated(generated_arg) + option_list(create_opt), generated(generated_arg), + create_if_not_exists(if_not_exists_opt) { name.str= (char *)name_arg; name.length= name_len_arg; @@ -302,8 +305,10 @@ public: uint delete_opt, update_opt, match_opt; Foreign_key(const LEX_STRING &name_arg, List<Key_part_spec> &cols, Table_ident *table, List<Key_part_spec> &ref_cols, - uint delete_opt_arg, uint update_opt_arg, uint match_opt_arg) - :Key(FOREIGN_KEY, name_arg, &default_key_create_info, 0, cols, NULL), + uint delete_opt_arg, uint update_opt_arg, uint match_opt_arg, + bool if_not_exists_opt) + :Key(FOREIGN_KEY, name_arg, &default_key_create_info, 0, cols, NULL, + if_not_exists_opt), ref_table(table), ref_columns(ref_cols), delete_opt(delete_opt_arg), update_opt(update_opt_arg), match_opt(match_opt_arg) @@ -532,12 +537,19 @@ typedef struct system_variables ulong tx_isolation; ulong updatable_views_with_limit; int max_user_connections; + ulong server_id; /** In slave thread we need to know in behalf of which thread the query is being run to replicate temp tables properly */ my_thread_id pseudo_thread_id; /** + When replicating an event group with GTID, keep these values around so + slave binlog can receive the same GTID as the original. + */ + uint32 gtid_domain_id; + uint64 gtid_seq_no; + /** Place holders to store Multi-source variables in sys_var.cc during update and show of variables. */ @@ -1685,11 +1697,11 @@ public: HASH handler_tables_hash; /* - One thread can hold up to one named user-level lock. This variable - points to a lock object if the lock is present. See item_func.cc and + A thread can hold named user-level locks. This variable + contains granted tickets if a lock is present. See item_func.cc and chapter 'Miscellaneous functions', for functions GET_LOCK, RELEASE_LOCK. */ - User_level_lock *ull; + HASH ull_hash; #ifndef DBUG_OFF uint dbug_sentry; // watch out for memory corruption #endif @@ -1699,7 +1711,6 @@ public: first byte of the packet in do_command() */ enum enum_server_command command; - uint32 server_id; uint32 file_id; // for LOAD DATA INFILE /* remote (peer) port */ uint16 peer_port; @@ -1776,7 +1787,7 @@ public: MY_BITMAP const* cols, size_t colcnt, const uchar *old_data, const uchar *new_data); - void set_server_id(uint32 sid) { server_id = sid; } + void set_server_id(uint32 sid) { variables.server_id = sid; } /* Member functions to handle pending event for row-level logging. diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index a56d9f4adc8..c1a138ec3ee 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -507,6 +507,7 @@ void lex_start(THD *thd) lex->expr_allows_subselect= TRUE; lex->use_only_table_context= FALSE; lex->parse_vcol_expr= FALSE; + lex->check_exists= FALSE; lex->verbose= 0; lex->name.str= 0; diff --git a/sql/sql_lex.h b/sql/sql_lex.h index ec229144b36..8bfcddefdfd 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -299,7 +299,8 @@ struct LEX_MASTER_INFO changed variable or if it should be left at old value */ enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE} - ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt; + ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt, + use_gtid_opt; void init() { @@ -315,7 +316,7 @@ struct LEX_MASTER_INFO pos= relay_log_pos= server_id= port= connect_retry= 0; heartbeat_period= 0; ssl= ssl_verify_server_cert= heartbeat_opt= - repl_ignore_server_ids_opt= LEX_MI_UNCHANGED; + repl_ignore_server_ids_opt= use_gtid_opt= LEX_MI_UNCHANGED; } }; @@ -2509,7 +2510,8 @@ struct LEX: public Query_tables_list uint16 create_view_algorithm; uint8 create_view_check; uint8 context_analysis_only; - bool drop_if_exists, drop_temporary, local_file, one_shot_set; + bool drop_temporary, local_file, one_shot_set; + bool check_exists; bool autocommit; bool verbose, no_write_to_binlog; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index c68e20f1b3a..f3cc03e9cc6 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1265,10 +1265,10 @@ bool dispatch_command(enum enum_server_command command, THD *thd, /* TODO: The following has to be changed to an 8 byte integer */ pos = uint4korr(packet); flags = uint2korr(packet + 4); - thd->server_id=0; /* avoid suicide */ + thd->variables.server_id=0; /* avoid suicide */ if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0 kill_zombie_dump_threads(slave_server_id); - thd->server_id = slave_server_id; + thd->variables.server_id = slave_server_id; general_log_print(thd, command, "Log: '%s' Pos: %ld", packet+10, (long) pos); @@ -2084,7 +2084,7 @@ mysql_execute_command(THD *thd) if (!(lex->sql_command == SQLCOM_UPDATE_MULTI) && !(lex->sql_command == SQLCOM_SET_OPTION) && !(lex->sql_command == SQLCOM_DROP_TABLE && - lex->drop_temporary && lex->drop_if_exists) && + lex->drop_temporary && lex->check_exists) && all_tables_not_ok(thd, all_tables)) { /* we warn the slave SQL thread */ @@ -3285,7 +3285,7 @@ end_with_restore_list: thd->variables.option_bits|= OPTION_KEEP_LOG; } /* DDL and binlog write order are protected by metadata locks. */ - res= mysql_rm_table(thd, first_table, lex->drop_if_exists, + res= mysql_rm_table(thd, first_table, lex->check_exists, lex->drop_temporary); } break; @@ -3499,7 +3499,7 @@ end_with_restore_list: #endif if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0)) break; - res= mysql_rm_db(thd, lex->name.str, lex->drop_if_exists, 0); + res= mysql_rm_db(thd, lex->name.str, lex->check_exists, 0); break; } case SQLCOM_ALTER_DB_UPGRADE: @@ -3627,7 +3627,7 @@ end_with_restore_list: case SQLCOM_DROP_EVENT: if (!(res= Events::drop_event(thd, lex->spname->m_db, lex->spname->m_name, - lex->drop_if_exists))) + lex->check_exists))) my_ok(thd); break; #else @@ -4314,7 +4314,7 @@ create_sp_error: if (lex->spname->m_db.str == NULL) { - if (lex->drop_if_exists) + if (lex->check_exists) { push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SP_DOES_NOT_EXIST, ER(ER_SP_DOES_NOT_EXIST), @@ -4383,7 +4383,7 @@ create_sp_error: my_ok(thd); break; case SP_KEY_NOT_FOUND: - if (lex->drop_if_exists) + if (lex->check_exists) { res= write_bin_log(thd, TRUE, thd->query(), thd->query_length()); push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, @@ -4596,7 +4596,7 @@ create_sp_error: if ((err_code= drop_server(thd, &lex->server_options))) { - if (! lex->drop_if_exists && err_code == ER_FOREIGN_SERVER_DOESNT_EXIST) + if (! lex->check_exists && err_code == ER_FOREIGN_SERVER_DOESNT_EXIST) { DBUG_PRINT("info", ("problem dropping server %s", lex->server_options.server_name)); @@ -6016,7 +6016,7 @@ bool add_field_to_list(THD *thd, LEX_STRING *field_name, enum_field_types type, lex->col_list.push_back(new Key_part_spec(*field_name, 0)); key= new Key(Key::PRIMARY, null_lex_str, &default_key_create_info, - 0, lex->col_list, NULL); + 0, lex->col_list, NULL, lex->check_exists); lex->alter_info.key_list.push_back(key); lex->col_list.empty(); } @@ -6026,7 +6026,7 @@ bool add_field_to_list(THD *thd, LEX_STRING *field_name, enum_field_types type, lex->col_list.push_back(new Key_part_spec(*field_name, 0)); key= new Key(Key::UNIQUE, null_lex_str, &default_key_create_info, 0, - lex->col_list, NULL); + lex->col_list, NULL, lex->check_exists); lex->alter_info.key_list.push_back(key); lex->col_list.empty(); } @@ -6078,7 +6078,7 @@ bool add_field_to_list(THD *thd, LEX_STRING *field_name, enum_field_types type, new_field->init(thd, field_name->str, type, length, decimals, type_modifier, default_value, on_update_value, comment, change, interval_list, cs, uint_geom_type, vcol_info, - create_options)) + create_options, lex->check_exists)) DBUG_RETURN(1); lex->alter_info.create_list.push_back(new_field); diff --git a/sql/sql_reload.cc b/sql/sql_reload.cc index 2720dc7cd74..e9c9dc86e41 100644 --- a/sql/sql_reload.cc +++ b/sql/sql_reload.cc @@ -205,6 +205,7 @@ bool reload_acl_and_cache(THD *thd, unsigned long options, DBUG_ASSERT(!thd || thd->locked_tables_mode || !thd->mdl_context.has_locks() || thd->handler_tables_hash.records || + thd->ull_hash.records || thd->global_read_lock.is_acquired()); /* diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 62336073b28..d767fb50cae 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -16,10 +16,12 @@ #include "sql_priv.h" #include "unireg.h" +#include "sql_base.h" #include "sql_parse.h" // check_access #ifdef HAVE_REPLICATION #include "rpl_mi.h" +#include "rpl_rli.h" #include "sql_repl.h" #include "sql_acl.h" // SUPER_ACL #include "log_event.h" @@ -81,7 +83,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, uint ident_len = (uint) strlen(p); ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - int4store(header + SERVER_ID_OFFSET, server_id); + int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); @@ -505,6 +507,26 @@ get_mariadb_slave_capability(THD *thd) /* + Get the value of the @slave_connect_state user variable into the supplied + String (this is the GTID connect state requested by the connecting slave). + + Returns false if error (ie. slave did not set the variable and does not + want to use GTID to set start position), true if success. +*/ +static bool +get_slave_connect_state(THD *thd, String *out_str) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_str(&null_value, out_str, 0) && !null_value; +} + + +/* Function prepares and sends repliation heartbeat event. @param net net object of THD @@ -539,7 +561,7 @@ static int send_heartbeat_event(NET* net, String* packet, uint ident_len = strlen(p); ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - int4store(header + SERVER_ID_OFFSET, server_id); + int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, 0); @@ -567,6 +589,643 @@ static int send_heartbeat_event(NET* net, String* packet, } +struct binlog_file_entry +{ + binlog_file_entry *next; + char *name; +}; + +static binlog_file_entry * +get_binlog_list(MEM_ROOT *memroot) +{ + IO_CACHE *index_file; + char fname[FN_REFLEN]; + size_t length; + binlog_file_entry *current_list= NULL, *e; + DBUG_ENTER("get_binlog_list"); + + if (!mysql_bin_log.is_open()) + { + my_error(ER_NO_BINARY_LOGGING, MYF(0)); + DBUG_RETURN(NULL); + } + + mysql_bin_log.lock_index(); + index_file=mysql_bin_log.get_index_file(); + reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); + + /* The file ends with EOF or empty line */ + while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) + { + --length; /* Remove the newline */ + if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) || + !(e->name= strmake_root(memroot, fname, length))) + { + mysql_bin_log.unlock_index(); + my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e)); + DBUG_RETURN(NULL); + } + e->next= current_list; + current_list= e; + } + mysql_bin_log.unlock_index(); + + DBUG_RETURN(current_list); +} + +/* + Find the Gtid_list_log_event at the start of a binlog. + + NULL for ok, non-NULL error message for error. + + If ok, then the event is returned in *out_gtid_list. This can be NULL if we + get back to binlogs written by old server version without GTID support. If + so, it means we have reached the point to start from, as no GTID events can + exist in earlier binlogs. +*/ +static const char * +get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) +{ + Format_description_log_event init_fdle(BINLOG_VERSION); + Format_description_log_event *fdle; + Log_event *ev; + const char *errormsg = NULL; + + *out_gtid_list= NULL; + + if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle, + opt_master_verify_checksum)) || + ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + { + if (ev) + delete ev; + return "Could not read format description log event while looking for " + "GTID position in binlog"; + } + + fdle= static_cast<Format_description_log_event *>(ev); + + for (;;) + { + Log_event_type typ; + + ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum); + if (!ev) + { + errormsg= "Could not read GTID list event while looking for GTID " + "position in binlog"; + break; + } + typ= ev->get_type_code(); + if (typ == GTID_LIST_EVENT) + break; /* Done, found it */ + delete ev; + if (typ == ROTATE_EVENT || typ == STOP_EVENT || + typ == FORMAT_DESCRIPTION_EVENT) + continue; /* Continue looking */ + + /* We did not find any Gtid_list_log_event, must be old binlog. */ + ev= NULL; + break; + } + + delete fdle; + *out_gtid_list= static_cast<Gtid_list_log_event *>(ev); + return errormsg; +} + + +/* + Check if every GTID requested by the slave is contained in this (or a later) + binlog file. Return true if so, false if not. + + We do the check with a single scan of the list of GTIDs, avoiding the need + to build an in-memory hash or stuff like that. + + We need to check that slave did not request GTID D-S-N1, when the + Gtid_list_log_event for this binlog file has D-S-N2 with N2 > N1. + + In addition, we need to check that we do not have a GTID D-S-N3 in the + Gtid_list_log_event where D is not present in the requested slave state at + all. Since if D is not in requested slave state, it means that slave needs + to start at the very first GTID in domain D. +*/ +static bool +contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev) +{ + uint32 i; + + for (i= 0; i < glev->count; ++i) + { + const rpl_gtid *gtid= st->find(glev->list[i].domain_id); + if (!gtid) + { + /* + The slave needs to start from the very beginning of this domain, which + is in an earlier binlog file. So we need to search back further. + */ + return false; + } + if (gtid->server_id == glev->list[i].server_id && + gtid->seq_no < glev->list[i].seq_no) + { + /* + The slave needs to receive gtid, but it is contained in an earlier + binlog file. So we need to search back further. + */ + return false; + } + } + + return true; +} + + +/* + Check the start GTID state requested by the slave against our binlog state. + + Give an error if the slave requests something that we do not have in our + binlog. + + T +*/ + +static int +check_slave_start_position(THD *thd, slave_connection_state *st, + const char **errormsg, rpl_gtid *error_gtid) +{ + uint32 i; + bool found; + int err; + rpl_gtid **delete_list= NULL; + uint32 delete_idx= 0; + bool slave_state_loaded= false; + uint32 missing_domains= 0; + rpl_gtid missing_domain_gtid; + + for (i= 0; i < st->hash.records; ++i) + { + rpl_gtid *slave_gtid= (rpl_gtid *)my_hash_element(&st->hash, i); + rpl_gtid master_gtid; + rpl_gtid master_replication_gtid; + rpl_gtid start_gtid; + + if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, + slave_gtid->server_id, + &master_gtid)) && + master_gtid.seq_no >= slave_gtid->seq_no) + continue; + + if (!slave_state_loaded) + { + if (rpl_load_gtid_slave_state(thd)) + { + *errormsg= "Failed to load replication slave GTID state"; + err= ER_CANNOT_LOAD_SLAVE_GTID_STATE; + goto end; + } + slave_state_loaded= true; + } + + if (!rpl_global_gtid_slave_state.domain_to_gtid(slave_gtid->domain_id, + &master_replication_gtid) || + slave_gtid->server_id != master_replication_gtid.server_id || + slave_gtid->seq_no != master_replication_gtid.seq_no) + { + rpl_gtid domain_gtid; + + if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, + &domain_gtid)) + { + /* + We do not have anything in this domain, neither in the binlog nor + in the slave state. So we are probably one master in a multi-master + setup, and this domain is served by a different master. + + This is not an error, however if we are missing _all_ domains + requested by the slave, then we still give error (below, after + the loop). + */ + if (!missing_domains) + missing_domain_gtid= *slave_gtid; + ++missing_domains; + continue; + } + *errormsg= "Requested slave GTID state not found in binlog"; + *error_gtid= *slave_gtid; + err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG; + goto end; + } + + /* + Ok, so connecting slave asked to start at a GTID that we do not have in + our binlog, but it was in fact the last GTID we applied earlier, when we + were acting as a replication slave. + + So this means that we were running as a replication slave without + --log-slave-updates, but now we switched to be a master. It is worth it + to handle this special case, as it allows users to run a simple + master -> slave without --log-slave-updates, and then exchange slave and + master, as long as they make sure the slave is caught up before switching. + */ + + /* + First check if we logged something ourselves as a master after being a + slave. This will be seen as a GTID with our own server_id and bigger + seq_no than what is in the slave state. + + If we did not log anything ourselves, then start the connecting slave + replicating from the current binlog end position, which in this case + corresponds to our replication slave state and hence what the connecting + slave is requesting. + */ + if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, + global_system_variables.server_id, + &start_gtid) && + start_gtid.seq_no > slave_gtid->seq_no) + { + /* + Start replication within this domain at the first GTID that we logged + ourselves after becoming a master. + */ + slave_gtid->server_id= global_system_variables.server_id; + } + else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, + &start_gtid)) + { + slave_gtid->server_id= start_gtid.server_id; + slave_gtid->seq_no= start_gtid.seq_no; + } + else + { + /* + We do not have _anything_ in our own binlog for this domain. Just + delete the entry in the slave connection state, then it will pick up + anything new that arrives. + + We just queue up the deletion and do it later, after the loop, so that + we do not mess up the iteration over the hash. + */ + if (!delete_list) + { + if ((delete_list= (rpl_gtid **)my_malloc(sizeof(*delete_list), + MYF(MY_WME)))) + { + *errormsg= "Out of memory while checking slave start position"; + err= ER_OUT_OF_RESOURCES; + goto end; + } + } + delete_list[delete_idx++]= slave_gtid; + } + } + + if (missing_domains == st->hash.records && missing_domains > 0) + { + *errormsg= "Requested slave GTID state not found in binlog"; + *error_gtid= missing_domain_gtid; + err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG; + goto end; + } + + /* Do any delayed deletes from the hash. */ + if (delete_list) + { + for (i= 0; i < delete_idx; ++i) + st->remove(delete_list[i]); + } + err= 0; + +end: + if (delete_list) + my_free(delete_list); + return err; +} + +/* + Find the name of the binlog file to start reading for a slave that connects + using GTID state. + + Returns the file name in out_name, which must be of size at least FN_REFLEN. + + Returns NULL on ok, error message on error. + + In case of non-error return, the returned binlog file is guaranteed to + contain the first event to be transmitted to the slave for every domain + present in our binlogs. It is still necessary to skip all GTIDs up to + and including the GTID requested by slave within each domain. + + However, as a special case, if the event to be sent to the slave is the very + first event (within that domain) in the returned binlog, then nothing should + be skipped, so that domain is deleted from the passed in slave connection + state. + + This is necessary in case the slave requests a GTID within a replication + domain that has long been inactive. The binlog file containing that GTID may + have been long since purged. However, as long as no GTIDs after that have + been purged, we have the GTID requested by slave in the Gtid_list_log_event + of the latest binlog. So we can start from there, as long as we delete the + corresponding entry in the slave state so we do not wrongly skip any events + that might turn up if that domain becomes active again, vainly looking for + the requested GTID that was already purged. +*/ +static const char * +gtid_find_binlog_file(slave_connection_state *state, char *out_name) +{ + MEM_ROOT memroot; + binlog_file_entry *list; + Gtid_list_log_event *glev= NULL; + const char *errormsg= NULL; + char buf[FN_REFLEN]; + + init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0, + MYF(MY_THREAD_SPECIFIC)); + if (!(list= get_binlog_list(&memroot))) + { + errormsg= "Out of memory while looking for GTID position in binlog"; + goto end; + } + + while (list) + { + File file; + IO_CACHE cache; + + if (!list->next) + { + /* + It should be safe to read the currently used binlog, as we will only + read the header part that is already written. + + But if that does not work on windows, then we will need to cache the + event somewhere in memory I suppose - that could work too. + */ + } + /* + Read the Gtid_list_log_event at the start of the binlog file to + get the binlog state. + */ + if (normalize_binlog_name(buf, list->name, false)) + { + errormsg= "Failed to determine binlog file name while looking for " + "GTID position in binlog"; + goto end; + } + bzero((char*) &cache, sizeof(cache)); + if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1) + goto end; + errormsg= get_gtid_list_event(&cache, &glev); + end_io_cache(&cache); + mysql_file_close(file, MYF(MY_WME)); + if (errormsg) + goto end; + + if (!glev || contains_all_slave_gtid(state, glev)) + { + uint32 i; + + strmake(out_name, buf, FN_REFLEN); + + /* + As a special case, we allow to start from binlog file N if the + requested GTID is the last event (in the corresponding domain) in + binlog file (N-1), but then we need to remove that GTID from the slave + state, rather than skipping events waiting for it to turn up. + */ + for (i= 0; i < glev->count; ++i) + { + const rpl_gtid *gtid= state->find(glev->list[i].domain_id); + if (!gtid) + { + /* contains_all_slave_gtid() would have returned false if so. */ + DBUG_ASSERT(0); + continue; + } + if (gtid->server_id == glev->list[i].server_id && + gtid->seq_no == glev->list[i].seq_no) + { + /* + The slave requested to start from the very beginning of this + domain in this binlog file. So delete the entry from the state, + we do not need to skip anything. + */ + state->remove(gtid); + } + } + + goto end; + } + delete glev; + glev= NULL; + list= list->next; + } + + /* We reached the end without finding anything. */ + errormsg= "Could not find GTID state requested by slave in any binlog " + "files. Probably the slave state is too old and required binlog files " + "have been purged."; + +end: + if (glev) + delete glev; + + free_root(&memroot, MYF(0)); + return errormsg; +} + + +/* + Given an old-style binlog position with file name and file offset, find the + corresponding gtid position. If the offset is not at an event boundary, give + an error. + + Return NULL on ok, error message string on error. + + ToDo: Improve the performance of this by using binlog index files. +*/ +static const char * +gtid_state_from_pos(const char *name, uint32 offset, + slave_connection_state *gtid_state) +{ + IO_CACHE cache; + File file; + const char *errormsg= NULL; + bool found_gtid_list_event= false; + bool found_format_description_event= false; + bool valid_pos= false; + uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; + int err; + String packet; + + if (gtid_state->load((const rpl_gtid *)NULL, 0)) + { + errormsg= "Internal error (out of memory?) initializing slave state " + "while scanning binlog to find start position"; + return errormsg; + } + + if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1) + return errormsg; + + /* + First we need to find the initial GTID_LIST_EVENT. We need this even + if the offset is at the very start of the binlog file. + + But if we do not find any GTID_LIST_EVENT, then this is an old binlog + with no GTID information, so we return empty GTID state. + */ + for (;;) + { + Log_event_type typ; + uint32 cur_pos; + + cur_pos= (uint32)my_b_tell(&cache); + if (cur_pos == offset) + valid_pos= true; + if (found_format_description_event && found_gtid_list_event && + cur_pos >= offset) + break; + + packet.length(0); + err= Log_event::read_log_event(&cache, &packet, NULL, + current_checksum_alg); + if (err) + { + errormsg= "Could not read binlog while searching for slave start " + "position on master"; + goto end; + } + /* + The cast to uchar is needed to avoid a signed char being converted to a + negative number. + */ + typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET]; + if (typ == FORMAT_DESCRIPTION_EVENT) + { + if (found_format_description_event) + { + errormsg= "Duplicate format description log event found while " + "searching for old-style position in binlog"; + goto end; + } + + current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length()); + found_format_description_event= true; + } + else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event) + { + errormsg= "Did not find format description log event while searching " + "for old-style position in binlog"; + goto end; + } + else if (typ == ROTATE_EVENT || typ == STOP_EVENT || + typ == BINLOG_CHECKPOINT_EVENT) + continue; /* Continue looking */ + else if (typ == GTID_LIST_EVENT) + { + rpl_gtid *gtid_list; + bool status; + uint32 list_len; + + if (found_gtid_list_event) + { + errormsg= "Found duplicate Gtid_list_log_event while scanning binlog " + "to find slave start position"; + goto end; + } + status= Gtid_list_log_event::peek(packet.ptr(), packet.length(), + >id_list, &list_len); + if (status) + { + errormsg= "Error reading Gtid_list_log_event while searching " + "for old-style position in binlog"; + goto end; + } + err= gtid_state->load(gtid_list, list_len); + my_free(gtid_list); + if (err) + { + errormsg= "Internal error (out of memory?) initialising slave state " + "while scanning binlog to find start position"; + goto end; + } + found_gtid_list_event= true; + } + else if (!found_gtid_list_event) + { + /* We did not find any Gtid_list_log_event, must be old binlog. */ + goto end; + } + else if (typ == GTID_EVENT) + { + rpl_gtid gtid; + uchar flags2; + if (Gtid_log_event::peek(packet.ptr(), packet.length(), >id.domain_id, + >id.server_id, >id.seq_no, &flags2)) + { + errormsg= "Corrupt gtid_log_event found while scanning binlog to find " + "initial slave position"; + goto end; + } + if (gtid_state->update(>id)) + { + errormsg= "Internal error (out of memory?) updating slave state while " + "scanning binlog to find start position"; + goto end; + } + } + } + + if (!valid_pos) + { + errormsg= "Slave requested incorrect position in master binlog. " + "Requested position %u in file '%s', but this position does not " + "correspond to the location of any binlog event."; + } + +end: + end_io_cache(&cache); + mysql_file_close(file, MYF(MY_WME)); + + return errormsg; +} + + +int +gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str) +{ + slave_connection_state gtid_state; + const char *lookup_name; + char name_buf[FN_REFLEN]; + LOG_INFO linfo; + + if (!mysql_bin_log.is_open()) + { + my_error(ER_NO_BINARY_LOGGING, MYF(0)); + return 1; + } + + if (in_name && in_name[0]) + { + mysql_bin_log.make_log_name(name_buf, in_name); + lookup_name= name_buf; + } + else + lookup_name= NULL; + linfo.index_file_offset= 0; + if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1)) + return 1; + + if (pos < 4) + pos= 4; + + if (gtid_state_from_pos(linfo.log_file_name, pos, >id_state) || + gtid_state.to_string(out_str)) + return 1; + return 0; +} + + /* Helper function for mysql_binlog_send() to write an event down the slave connection. @@ -577,9 +1236,63 @@ static const char * send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Log_event_type event_type, char *log_file_name, IO_CACHE *log, int mariadb_slave_capability, - ulong ev_offset, uint8 current_checksum_alg) + ulong ev_offset, uint8 current_checksum_alg, + bool using_gtid_state, slave_connection_state *gtid_state, + enum_gtid_skip_type *gtid_skip_group) { my_off_t pos; + size_t len= packet->length(); + + /* Skip GTID event groups until we reach slave position within a domain_id. */ + if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0) + { + uint32 server_id, domain_id; + uint64 seq_no; + uchar flags2; + rpl_gtid *gtid; + + if (ev_offset > len || + Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + &domain_id, &server_id, &seq_no, &flags2)) + return "Failed to read Gtid_log_event: corrupt binlog"; + gtid= gtid_state->find(domain_id); + if (gtid != NULL) + { + /* Skip this event group if we have not yet reached slave start pos. */ + if (server_id != gtid->server_id || seq_no <= gtid->seq_no) + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + /* + Delete this entry if we have reached slave start position (so we will + not skip subsequent events and won't have to look them up and check). + */ + if (server_id == gtid->server_id && seq_no >= gtid->seq_no) + gtid_state->remove(gtid); + } + } + + /* + Skip event group if we have not yet reached the correct slave GTID position. + + Note that slave that understands GTID can also tolerate holes, so there is + no need to supply dummy event. + */ + switch (*gtid_skip_group) + { + case GTID_SKIP_STANDALONE: + if (!Log_event::is_part_of_group(event_type)) + *gtid_skip_group= GTID_SKIP_NOT; + return NULL; + case GTID_SKIP_TRANSACTION: + if (event_type == XID_EVENT || + (event_type == QUERY_EVENT && + Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset, + len - ev_offset))) + *gtid_skip_group= GTID_SKIP_NOT; + return NULL; + case GTID_SKIP_NOT: + break; + } /* Do not send annotate_rows events unless slave requested it. */ if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) @@ -616,10 +1329,34 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, } /* - Do not send binlog checkpoint events to a slave that does not understand it. + Replace GTID events with old-style BEGIN events for slaves that do not + understand global transaction IDs. For stand-alone events, where there is + no terminating COMMIT query event, omit the GTID event or replace it with + a dummy event, as appropriate. + */ + if (event_type == GTID_EVENT && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID) + { + bool need_dummy= + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES; + bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy, + ev_offset, + current_checksum_alg); + if (err) + return "Failed to replace GTID event with backwards-compatible event: " + "currupt event."; + if (!need_dummy) + return NULL; + } + + /* + Do not send binlog checkpoint or gtid list events to a slave that does not + understand it. */ - if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && - mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) + if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) || + (unlikely(event_type == GTID_LIST_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)) { if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) { @@ -634,8 +1371,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, binlog positions. */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) - return "Failed to replace binlog checkpoint event with dummy: " - "too small event."; + return "Failed to replace binlog checkpoint or gtid list event with " + "dummy: too small event."; } } @@ -661,7 +1398,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, (thd, flags, packet, log_file_name, pos))) return "run 'before_send_event' hook failed"; - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if (my_net_write(net, (uchar*) packet->ptr(), len)) return "Failed on my_net_write()"; DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); @@ -696,6 +1433,12 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, mysql_mutex_t *log_lock; mysql_cond_t *log_cond; int mariadb_slave_capability; + char str_buf[256]; + String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); + bool using_gtid_state; + slave_connection_state gtid_state, return_gtid_state; + rpl_gtid error_gtid; + enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; @@ -706,6 +1449,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); + bzero(&error_gtid, sizeof(error_gtid)); /* heartbeat_period from @master_heartbeat_period user variable */ @@ -722,9 +1466,25 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, set_timespec_nsec(*heartbeat_ts, 0); } mariadb_slave_capability= get_mariadb_slave_capability(thd); + + connect_gtid_state.length(0); + using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); + DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;); + /* + We want to corrupt the first event, in Log_event::read_log_event(). + But we do not want the corruption to happen early, eg. when client does + BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to + set the real DBUG injection here. + */ + DBUG_EXECUTE_IF("corrupt_read_log_event2_set", + { + DBUG_SET("-d,corrupt_read_log_event2_set"); + DBUG_SET("+d,corrupt_read_log_event2"); + }); + if (global_system_variables.log_warnings > 1) sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", - thd->server_id, log_ident, (ulong)pos); + (int)thd->variables.server_id, log_ident, (ulong)pos); if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) { errmsg= "Failed to run hook 'transmit_start'"; @@ -755,10 +1515,36 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, } name=search_file_name; - if (log_ident[0]) - mysql_bin_log.make_log_name(search_file_name, log_ident); + if (using_gtid_state) + { + if (gtid_state.load(connect_gtid_state.c_ptr_quick(), + connect_gtid_state.length())) + { + errmsg= "Out of memory or malformed slave request when obtaining start " + "position from GTID state"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + if ((error= check_slave_start_position(thd, >id_state, &errmsg, + &error_gtid))) + { + my_errno= error; + goto err; + } + if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name))) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; + } + pos= 4; + } else - name=0; // Find first log + { + if (log_ident[0]) + mysql_bin_log.make_log_name(search_file_name, log_ident); + else + name=0; // Find first log + } linfo.index_file_offset = 0; @@ -1012,7 +1798,8 @@ impossible position"; if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, log_file_name, &log, mariadb_slave_capability, ev_offset, - current_checksum_alg))) + current_checksum_alg, using_gtid_state, + >id_state, >id_skip_group))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; @@ -1105,7 +1892,8 @@ impossible position"; int ret; ulong signal_cnt; DBUG_PRINT("wait",("waiting for data in binary log")); - if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0) + /* For mysqlbinlog (mysqlbinlog.server_id==0). */ + if (thd->variables.server_id==0) { mysql_mutex_unlock(log_lock); goto end; @@ -1172,7 +1960,9 @@ impossible position"; (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, log_file_name, &log, mariadb_slave_capability, ev_offset, - current_checksum_alg))) + current_checksum_alg, + using_gtid_state, >id_state, + >id_skip_group))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; @@ -1264,6 +2054,22 @@ err: my_basename(p_coord->file_name), p_coord->pos, my_basename(log_file_name), my_b_tell(&log)); } + else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG) + { + my_snprintf(error_text, sizeof(error_text), + "Error: connecting slave requested to start from GTID " + "%u-%u-%llu, which is not in the master's binlog", + error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + /* Use this error code so slave will know not to try reconnect. */ + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } + else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE) + { + my_snprintf(error_text, sizeof(error_text), + "Failed to load replication slave GTID state from table %s.%s", + "mysql", rpl_gtid_slave_state_table_name.str); + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } else strcpy(error_text, errmsg); end_io_cache(&log); @@ -1629,7 +2435,7 @@ void kill_zombie_dump_threads(uint32 slave_server_id) while ((tmp=it++)) { if (tmp->command == COM_BINLOG_DUMP && - tmp->server_id == slave_server_id) + tmp->variables.server_id == slave_server_id) { mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete break; @@ -1838,7 +2644,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) { ulong s_id; get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i); - if (s_id == ::server_id && replicate_same_server_id) + if (s_id == global_system_variables.server_id && replicate_same_server_id) { my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id)); ret= TRUE; @@ -1898,6 +2704,13 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos; } + if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_ENABLE) + mi->using_gtid= true; + else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_DISABLE || + lex_mi->log_file_name || lex_mi->pos || + lex_mi->relay_log_name || lex_mi->relay_log_pos) + mi->using_gtid= false; + /* If user did specify neither host nor port nor any log name nor any log pos, i.e. he specified only user/password/master_connect_retry, he probably @@ -1928,6 +2741,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) strmake(mi->master_log_name, mi->rli.group_master_log_name, sizeof(mi->master_log_name)-1); } + /* Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never a slave before). @@ -1950,6 +2764,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) ret= TRUE; goto err; } + + if (mi->using_gtid) + { + /* + Clear the position in the master binlogs, so that we request the + correct GTID position. + */ + mi->master_log_name[0]= 0; + mi->master_log_pos= 0; + } } else { @@ -2423,4 +3247,139 @@ int log_loaded_block(IO_CACHE* file) DBUG_RETURN(0); } + +/** + Initialise the slave replication state from the mysql.rpl_slave_state table. + + This is called each time an SQL thread starts, but the data is only actually + loaded on the first call. + + The slave state is the last GTID applied on the slave within each + replication domain. + + To avoid row lock contention, there are multiple rows for each domain_id. + The one containing the current slave state is the one with the maximal + sub_id value, within each domain_id. + + CREATE TABLE mysql.rpl_slave_state ( + domain_id INT UNSIGNED NOT NULL, + sub_id BIGINT UNSIGNED NOT NULL, + server_id INT UNSIGNED NOT NULL, + seq_no BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (domain_id, sub_id)) +*/ + +void +rpl_init_gtid_slave_state() +{ + rpl_global_gtid_slave_state.init(); +} + + +void +rpl_deinit_gtid_slave_state() +{ + rpl_global_gtid_slave_state.deinit(); +} + + +/* + Format the current GTID state as a string, for use when connecting to a + master server with GTID, or for returning the value of @@global.gtid_state. + + If the flag use_binlog is true, then the contents of the binary log (if + enabled) is merged into the current GTID state. +*/ +int +rpl_append_gtid_state(String *dest, bool use_binlog) +{ + int err; + rpl_gtid *gtid_list= NULL; + uint32 num_gtids= 0; + + if (opt_bin_log && + (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) + return err; + + rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids); + my_free(gtid_list); + + return 0; +} + + +bool +rpl_gtid_pos_check(char *str, size_t len) +{ + slave_connection_state tmp_slave_state; + + /* Check that we can parse the supplied string. */ + if (tmp_slave_state.load(str, len)) + return true; + + /* + Check our own binlog for any of our own transactions that are newer + than the GTID state the user is requesting. Any such transactions would + result in an out-of-order binlog, which could break anyone replicating + with us as master. + + So give an error if this is found, requesting the user to do a + RESET MASTER (to clean up the binlog) if they really want this. + */ + if (mysql_bin_log.is_open()) + { + rpl_gtid *binlog_gtid_list= NULL; + uint32 num_binlog_gtids= 0; + uint32 i; + + if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list, + &num_binlog_gtids)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + return true; + } + for (i= 0; i < num_binlog_gtids; ++i) + { + rpl_gtid *binlog_gtid= &binlog_gtid_list[i]; + rpl_gtid *slave_gtid; + if (binlog_gtid->server_id != global_system_variables.server_id) + continue; + if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id))) + { + my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0), + binlog_gtid->domain_id, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + break; + } + if (slave_gtid->seq_no < binlog_gtid->seq_no) + { + my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0), + slave_gtid->domain_id, slave_gtid->server_id, + slave_gtid->seq_no, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + break; + } + } + my_free(binlog_gtid_list); + if (i != num_binlog_gtids) + return true; + } + + return false; +} + + +bool +rpl_gtid_pos_update(THD *thd, char *str, size_t len) +{ + if (rpl_global_gtid_slave_state.load(thd, str, len, true)) + { + my_error(ER_FAILED_GTID_STATE_INIT, MYF(0)); + return true; + } + else + return false; +} + + #endif /* HAVE_REPLICATION */ diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 9ca7e6b00b1..3af8f721bd7 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -65,6 +65,14 @@ int log_loaded_block(IO_CACHE* file); int init_replication_sys_vars(); void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags); +extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; +void rpl_init_gtid_slave_state(); +void rpl_deinit_gtid_slave_state(); +int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str); +int rpl_append_gtid_state(String *dest, bool use_binlog); +bool rpl_gtid_pos_check(char *str, size_t len); +bool rpl_gtid_pos_update(THD *thd, char *str, size_t len); + #endif /* HAVE_REPLICATION */ #endif /* SQL_REPL_INCLUDED */ diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 38d86ade519..be8ff20eb48 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -3877,6 +3877,7 @@ make_join_statistics(JOIN *join, List<TABLE_LIST> &tables_list, DBUG_RETURN(TRUE); join->join_tab=stat; + join->top_join_tab_count= table_count; join->map2table=stat_ref; join->table= table_vector; join->const_tables=const_count; @@ -3924,6 +3925,8 @@ make_join_statistics(JOIN *join, List<TABLE_LIST> &tables_list, if (join->choose_subquery_plan(all_table_map & ~join->const_table_map)) goto error; + DEBUG_SYNC(join->thd, "inside_make_join_statistics"); + /* Generate an execution plan from the found optimal join order. */ DBUG_RETURN(join->thd->check_killed() || get_best_combination(join)); @@ -10854,6 +10857,10 @@ bool JOIN_TAB::preread_init() dbug_serve_apcs(join->thd, 1); ); + /* init ftfuns for just initialized derived table */ + if (table->fulltext_searched) + init_ftfuncs(join->thd, join->select_lex, test(join->order)); + return FALSE; } diff --git a/sql/sql_show.cc b/sql/sql_show.cc index ac71837def4..46cc157061d 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -7451,20 +7451,20 @@ TABLE *create_schema_table(THD *thd, TABLE_LIST *table_list) break; case MYSQL_TYPE_DATE: if (!(item=new Item_return_date_time(fields_info->field_name, - MAX_DATE_WIDTH, + strlen(fields_info->field_name), fields_info->field_type))) DBUG_RETURN(0); break; case MYSQL_TYPE_TIME: if (!(item=new Item_return_date_time(fields_info->field_name, - MAX_TIME_FULL_WIDTH, + strlen(fields_info->field_name), fields_info->field_type))) DBUG_RETURN(0); break; case MYSQL_TYPE_TIMESTAMP: case MYSQL_TYPE_DATETIME: if (!(item=new Item_return_date_time(fields_info->field_name, - MAX_DATETIME_WIDTH, + strlen(fields_info->field_name), fields_info->field_type))) DBUG_RETURN(0); break; @@ -7837,16 +7837,22 @@ int make_schema_select(THD *thd, SELECT_LEX *sel, We have to make non const db_name & table_name because of lower_case_table_names */ - thd->make_lex_string(&db, INFORMATION_SCHEMA_NAME.str, - INFORMATION_SCHEMA_NAME.length); - thd->make_lex_string(&table, schema_table->table_name, - strlen(schema_table->table_name)); - if (schema_table->old_format(thd, schema_table) || /* Handle old syntax */ - !sel->add_table_to_list(thd, new Table_ident(thd, db, table, 0), + if (!thd->make_lex_string(&db, INFORMATION_SCHEMA_NAME.str, + INFORMATION_SCHEMA_NAME.length)) + DBUG_RETURN(1); + + if (!thd->make_lex_string(&table, schema_table->table_name, + strlen(schema_table->table_name))) + DBUG_RETURN(1); + + if (schema_table->old_format(thd, schema_table)) + + DBUG_RETURN(1); + + if (!sel->add_table_to_list(thd, new Table_ident(thd, db, table, 0), 0, 0, TL_READ, MDL_SHARED_READ)) - { DBUG_RETURN(1); - } + DBUG_RETURN(0); } diff --git a/sql/sql_table.cc b/sql/sql_table.cc index 39c297fb5a2..9079bca7c1d 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -4954,6 +4954,246 @@ is_index_maintenance_unique (TABLE *table, Alter_info *alter_info) /* + Preparation for table creation + + SYNOPSIS + handle_if_exists_option() + thd Thread object. + table The altered table. + alter_info List of columns and indexes to create + + DESCRIPTION + Looks for the IF [NOT] EXISTS options, checks the states and remove items + from the list if existing found. + + RETURN VALUES + NONE +*/ + +static void +handle_if_exists_options(THD *thd, TABLE *table, Alter_info *alter_info) +{ + Field **f_ptr; + DBUG_ENTER("handle_if_exists_option"); + + /* Handle ADD COLUMN IF NOT EXISTS. */ + { + List_iterator<Create_field> it(alter_info->create_list); + Create_field *sql_field; + + while ((sql_field=it++)) + { + if (!sql_field->create_if_not_exists || sql_field->change) + continue; + /* + If there is a field with the same name in the table already, + remove the sql_field from the list. + */ + for (f_ptr=table->field; *f_ptr; f_ptr++) + { + if (my_strcasecmp(system_charset_info, + sql_field->field_name, (*f_ptr)->field_name) == 0) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_DUP_FIELDNAME, ER(ER_DUP_FIELDNAME), + sql_field->field_name); + it.remove(); + if (alter_info->create_list.is_empty()) + { + alter_info->flags&= ~ALTER_ADD_COLUMN; + if (alter_info->key_list.is_empty()) + alter_info->flags&= ~ALTER_ADD_INDEX; + } + break; + } + } + } + } + + /* Handle MODIFY COLUMN IF EXISTS. */ + { + List_iterator<Create_field> it(alter_info->create_list); + Create_field *sql_field; + + while ((sql_field=it++)) + { + if (!sql_field->create_if_not_exists || !sql_field->change) + continue; + /* + If there is NO field with the same name in the table already, + remove the sql_field from the list. + */ + for (f_ptr=table->field; *f_ptr; f_ptr++) + { + if (my_strcasecmp(system_charset_info, + sql_field->field_name, (*f_ptr)->field_name) == 0) + { + break; + } + } + if (*f_ptr == NULL) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_BAD_FIELD_ERROR, ER(ER_BAD_FIELD_ERROR), + sql_field->change, table->s->table_name.str); + it.remove(); + if (alter_info->create_list.is_empty()) + { + alter_info->flags&= ~(ALTER_ADD_COLUMN | ALTER_CHANGE_COLUMN); + if (alter_info->key_list.is_empty()) + alter_info->flags&= ~ALTER_ADD_INDEX; + } + } + } + } + + /* Handle DROP COLUMN/KEY IF EXISTS. */ + { + List_iterator<Alter_drop> drop_it(alter_info->drop_list); + Alter_drop *drop; + bool remove_drop; + while ((drop= drop_it++)) + { + if (!drop->drop_if_exists) + continue; + remove_drop= TRUE; + if (drop->type == Alter_drop::COLUMN) + { + /* + If there is NO field with that name in the table, + remove the 'drop' from the list. + */ + for (f_ptr=table->field; *f_ptr; f_ptr++) + { + if (my_strcasecmp(system_charset_info, + drop->name, (*f_ptr)->field_name) == 0) + { + remove_drop= FALSE; + break; + } + } + } + else /* Alter_drop::KEY */ + { + uint n_key; + for (n_key=0; n_key < table->s->keys; n_key++) + { + if (my_strcasecmp(system_charset_info, + drop->name, table->key_info[n_key].name) == 0) + { + remove_drop= FALSE; + break; + } + } + } + if (remove_drop) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_CANT_DROP_FIELD_OR_KEY, ER(ER_CANT_DROP_FIELD_OR_KEY), + drop->name); + drop_it.remove(); + if (alter_info->drop_list.is_empty()) + alter_info->flags&= ~(ALTER_DROP_COLUMN | ALTER_DROP_INDEX); + } + } + } + + /* ALTER TABLE ADD KEY IF NOT EXISTS */ + /* ALTER TABLE ADD FOREIGN KEY IF NOT EXISTS */ + { + Key *key; + List_iterator<Key> key_it(alter_info->key_list); + uint n_key; + while ((key=key_it++)) + { + if (!key->create_if_not_exists) + continue; + for (n_key=0; n_key < table->s->keys; n_key++) + { + if (my_strcasecmp(system_charset_info, + key->name.str, table->key_info[n_key].name) == 0) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_DUP_KEYNAME, ER(ER_DUP_KEYNAME), key->name.str); + key_it.remove(); + if (key->type == Key::FOREIGN_KEY) + { + /* ADD FOREIGN KEY appends two items. */ + key_it.remove(); + } + if (alter_info->key_list.is_empty()) + alter_info->flags&= ~ALTER_ADD_INDEX; + break; + } + } + } + } + +#ifdef WITH_PARTITION_STORAGE_ENGINE + partition_info *tab_part_info= table->part_info; + if (tab_part_info && thd->lex->check_exists) + { + /* ALTER TABLE ADD PARTITION IF NOT EXISTS */ + if (alter_info->flags & ALTER_ADD_PARTITION) + { + partition_info *alt_part_info= thd->lex->part_info; + if (alt_part_info) + { + List_iterator<partition_element> new_part_it(alt_part_info->partitions); + partition_element *pe; + while ((pe= new_part_it++)) + { + if (!tab_part_info->has_unique_name(pe)) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_SAME_NAME_PARTITION, ER(ER_SAME_NAME_PARTITION), + pe->partition_name); + alter_info->flags&= ~ALTER_ADD_PARTITION; + thd->lex->part_info= NULL; + break; + } + } + } + } + /* ALTER TABLE DROP PARTITION IF EXISTS */ + if (alter_info->flags & ALTER_DROP_PARTITION) + { + List_iterator<char> names_it(alter_info->partition_names); + char *name; + + while ((name= names_it++)) + { + List_iterator<partition_element> part_it(tab_part_info->partitions); + partition_element *part_elem; + while ((part_elem= part_it++)) + { + if (my_strcasecmp(system_charset_info, + part_elem->partition_name, name) == 0) + break; + } + if (!part_elem) + { + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_DROP_PARTITION_NON_EXISTENT, + ER(ER_DROP_PARTITION_NON_EXISTENT), "DROP"); + names_it.remove(); + } + } + if (alter_info->partition_names.elements == 0) + alter_info->flags&= ~ALTER_DROP_PARTITION; + } + } +#endif /*WITH_PARTITION_STORAGE_ENGINE*/ + + /* Clear the ALTER_FOREIGN_KEY flag if nothing other than that set. */ + if (alter_info->flags == ALTER_FOREIGN_KEY) + alter_info->flags= 0; + + DBUG_VOID_RETURN; +} + + +/* SYNOPSIS mysql_compare_tables() table The original table. @@ -5873,7 +6113,7 @@ mysql_prepare_alter_table(THD *thd, TABLE *table, key= new Key(key_type, key_name, strlen(key_name), &key_create_info, test(key_info->flags & HA_GENERATED_KEY), - key_parts, key_info->option_list); + key_parts, key_info->option_list, FALSE); new_key_list.push_back(key); } } @@ -6386,6 +6626,17 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name, DBUG_RETURN(error); } + handle_if_exists_options(thd, table, alter_info); + + /* Look if we have to do anything at all. */ + /* Normally ALTER can become NOOP only after handling */ + /* the IF (NOT) EXISTS options. */ + if (alter_info->flags == 0) + { + copied= deleted= 0; + goto end_temporary; + } + /* We have to do full alter table. */ #ifdef WITH_PARTITION_STORAGE_ENGINE diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc index 882f7fcca8c..35a4464b9e2 100644 --- a/sql/sql_trigger.cc +++ b/sql/sql_trigger.cc @@ -443,7 +443,7 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create) if (!create) { - bool if_exists= thd->lex->drop_if_exists; + bool if_exists= thd->lex->check_exists; /* Protect the query table list from the temporary and potentially diff --git a/sql/sql_view.cc b/sql/sql_view.cc index 0a84dd996ce..37f6af13bb7 100644 --- a/sql/sql_view.cc +++ b/sql/sql_view.cc @@ -1672,7 +1672,7 @@ bool mysql_drop_view(THD *thd, TABLE_LIST *views, enum_drop_mode drop_mode) { char name[FN_REFLEN]; my_snprintf(name, sizeof(name), "%s.%s", view->db, view->table_name); - if (thd->lex->drop_if_exists) + if (thd->lex->check_exists) { push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR), diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 0c3a016fa28..94fdc46647f 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -722,7 +722,7 @@ static bool add_create_index (LEX *lex, Key::Keytype type, { Key *key; key= new Key(type, name, info ? info : &lex->key_create_info, generated, - lex->col_list, lex->option_list); + lex->col_list, lex->option_list, lex->check_exists); if (key == NULL) return TRUE; @@ -835,6 +835,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token AUTHORS_SYM %token AUTOEXTEND_SIZE_SYM %token AUTO_INC +%token AUTO_SYM %token AVG_ROW_LENGTH %token AVG_SYM /* SQL-2003-N */ %token BACKUP_SYM @@ -1094,6 +1095,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %token LOW_PRIORITY %token LT /* OPERATOR */ %token MASTER_CONNECT_RETRY_SYM +%token MASTER_USE_GTID_SYM %token MASTER_HOST_SYM %token MASTER_LOG_FILE_SYM %token MASTER_LOG_POS_SYM @@ -1454,7 +1456,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); IDENT_sys TEXT_STRING_sys TEXT_STRING_literal NCHAR_STRING opt_component key_cache_name sp_opt_label BIN_NUM label_ident TEXT_STRING_filesystem ident_or_empty - opt_constraint constraint opt_ident + opt_constraint constraint opt_ident opt_if_not_exists_ident %type <lex_str_ptr> opt_table_alias @@ -1471,7 +1473,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize); %type <num> type type_with_opt_collate int_type real_type order_dir lock_option - udf_type if_exists opt_local opt_table_options table_options + udf_type opt_if_exists opt_local opt_table_options table_options table_option opt_if_not_exists opt_no_write_to_binlog opt_temporary all_or_any opt_distinct opt_ignore_leaves fulltext_options spatial_type union_option @@ -2062,6 +2064,16 @@ master_file_def: /* Adjust if < BIN_LOG_HEADER_SIZE (same comment as Lex->mi.pos) */ Lex->mi.relay_log_pos = max(BIN_LOG_HEADER_SIZE, Lex->mi.relay_log_pos); } + | MASTER_USE_GTID_SYM EQ ulong_num + { + if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) + { + my_error(ER_DUP_ARGUMENT, MYF(0), "MASTER_use_gtid"); + MYSQL_YYABORT; + } + Lex->mi.use_gtid_opt= $3 ? + LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE; + } ; optional_connection_name: @@ -2131,36 +2143,36 @@ create: } create_table_set_open_action_and_adjust_tables(lex); } - | CREATE opt_unique INDEX_SYM ident key_alg ON table_ident + | CREATE opt_unique INDEX_SYM opt_if_not_exists ident key_alg ON table_ident { - if (add_create_index_prepare(Lex, $7)) + if (add_create_index_prepare(Lex, $8)) MYSQL_YYABORT; } '(' key_list ')' normal_key_options { - if (add_create_index(Lex, $2, $4)) + if (add_create_index(Lex, $2, $5)) MYSQL_YYABORT; } - | CREATE fulltext INDEX_SYM ident init_key_options ON + | CREATE fulltext INDEX_SYM opt_if_not_exists ident init_key_options ON table_ident { - if (add_create_index_prepare(Lex, $7)) + if (add_create_index_prepare(Lex, $8)) MYSQL_YYABORT; } '(' key_list ')' fulltext_key_options { - if (add_create_index(Lex, $2, $4)) + if (add_create_index(Lex, $2, $5)) MYSQL_YYABORT; } - | CREATE spatial INDEX_SYM ident init_key_options ON + | CREATE spatial INDEX_SYM opt_if_not_exists ident init_key_options ON table_ident { - if (add_create_index_prepare(Lex, $7)) + if (add_create_index_prepare(Lex, $8)) MYSQL_YYABORT; } '(' key_list ')' spatial_key_options { - if (add_create_index(Lex, $2, $4)) + if (add_create_index(Lex, $2, $5)) MYSQL_YYABORT; } | CREATE DATABASE opt_if_not_exists ident @@ -5079,9 +5091,17 @@ table_option: ; opt_if_not_exists: - /* empty */ { $$= 0; } - | IF not EXISTS { $$=HA_LEX_CREATE_IF_NOT_EXISTS; } - ; + /* empty */ + { + Lex->check_exists= FALSE; + $$= 0; + } + | IF not EXISTS + { + Lex->check_exists= TRUE; + $$=HA_LEX_CREATE_IF_NOT_EXISTS; + } + ; opt_create_table_options: /* empty */ @@ -5399,14 +5419,14 @@ column_def: ; key_def: - normal_key_type opt_ident key_alg '(' key_list ')' + normal_key_type opt_if_not_exists_ident key_alg '(' key_list ')' { Lex->option_list= NULL; } normal_key_options { if (add_create_index (Lex, $1, $2)) MYSQL_YYABORT; } - | fulltext opt_key_or_index opt_ident init_key_options + | fulltext opt_key_or_index opt_if_not_exists_ident init_key_options '(' key_list ')' { Lex->option_list= NULL; } fulltext_key_options @@ -5414,7 +5434,7 @@ key_def: if (add_create_index (Lex, $1, $3)) MYSQL_YYABORT; } - | spatial opt_key_or_index opt_ident init_key_options + | spatial opt_key_or_index opt_if_not_exists_ident init_key_options '(' key_list ')' { Lex->option_list= NULL; } spatial_key_options @@ -5430,7 +5450,7 @@ key_def: if (add_create_index (Lex, $2, $3.str ? $3 : $1)) MYSQL_YYABORT; } - | opt_constraint FOREIGN KEY_SYM opt_ident '(' key_list ')' references + | opt_constraint FOREIGN KEY_SYM opt_if_not_exists_ident '(' key_list ')' references { LEX *lex=Lex; Key *key= new Foreign_key($4.str ? $4 : $1, lex->col_list, @@ -5438,7 +5458,8 @@ key_def: lex->ref_list, lex->fk_delete_opt, lex->fk_update_opt, - lex->fk_match_option); + lex->fk_match_option, + lex->check_exists); if (key == NULL) MYSQL_YYABORT; lex->alter_info.key_list.push_back(key); @@ -6407,6 +6428,18 @@ opt_ident: | field_ident { $$= $1; } ; +opt_if_not_exists_ident: + opt_if_not_exists opt_ident + { + LEX *lex= Lex; + if (lex->check_exists && lex->sql_command != SQLCOM_ALTER_TABLE) + { + my_parse_error(ER(ER_SYNTAX_ERROR)); + MYSQL_YYABORT; + } + $$= $2; + }; + opt_component: /* empty */ { $$= null_lex_str; } | '.' ident { $$= $2; } @@ -6663,7 +6696,7 @@ alter_commands: new table and so forth. */ | add_partition_rule - | DROP PARTITION_SYM alt_part_name_list + | DROP PARTITION_SYM opt_if_exists alt_part_name_list { Lex->alter_info.flags|= ALTER_DROP_PARTITION; } @@ -6764,7 +6797,7 @@ all_or_alt_part_name_list: ; add_partition_rule: - ADD PARTITION_SYM opt_no_write_to_binlog + ADD PARTITION_SYM opt_if_not_exists opt_no_write_to_binlog { LEX *lex= Lex; lex->part_info= new partition_info(); @@ -6774,7 +6807,7 @@ add_partition_rule: MYSQL_YYABORT; } lex->alter_info.flags|= ALTER_ADD_PARTITION; - lex->no_write_to_binlog= $3; + lex->no_write_to_binlog= $4; } add_part_extra {} @@ -6850,7 +6883,7 @@ alter_list: ; add_column: - ADD opt_column + ADD opt_column opt_if_not_exists { LEX *lex=Lex; lex->change=0; @@ -6872,10 +6905,10 @@ alter_list_item: { Lex->alter_info.flags|= ALTER_ADD_COLUMN | ALTER_ADD_INDEX; } - | CHANGE opt_column field_ident + | CHANGE opt_column opt_if_exists field_ident { LEX *lex=Lex; - lex->change= $3.str; + lex->change= $4.str; lex->alter_info.flags|= ALTER_CHANGE_COLUMN; lex->option_list= NULL; } @@ -6883,7 +6916,7 @@ alter_list_item: { Lex->create_last_non_select_table= Lex->last_table(); } - | MODIFY_SYM opt_column field_ident + | MODIFY_SYM opt_column opt_if_exists field_ident { LEX *lex=Lex; lex->length=lex->dec=0; lex->type=0; @@ -6897,12 +6930,12 @@ alter_list_item: field_def { LEX *lex=Lex; - if (add_field_to_list(lex->thd,&$3, - (enum enum_field_types) $5, + if (add_field_to_list(lex->thd,&$4, + (enum enum_field_types) $6, lex->length,lex->dec,lex->type, lex->default_value, lex->on_update_value, &lex->comment, - $3.str, &lex->interval_list, lex->charset, + $4.str, &lex->interval_list, lex->charset, lex->uint_geom_type, lex->vcol_info, lex->option_list)) MYSQL_YYABORT; @@ -6911,32 +6944,33 @@ alter_list_item: { Lex->create_last_non_select_table= Lex->last_table(); } - | DROP opt_column field_ident opt_restrict + | DROP opt_column opt_if_exists field_ident opt_restrict { LEX *lex=Lex; - Alter_drop *ad= new Alter_drop(Alter_drop::COLUMN, $3.str); + Alter_drop *ad= new Alter_drop(Alter_drop::COLUMN, $4.str, $3); if (ad == NULL) MYSQL_YYABORT; lex->alter_info.drop_list.push_back(ad); lex->alter_info.flags|= ALTER_DROP_COLUMN; } - | DROP FOREIGN KEY_SYM opt_ident + | DROP FOREIGN KEY_SYM opt_if_exists opt_ident { Lex->alter_info.flags|= ALTER_DROP_INDEX | ALTER_FOREIGN_KEY; } | DROP PRIMARY_SYM KEY_SYM { LEX *lex=Lex; - Alter_drop *ad= new Alter_drop(Alter_drop::KEY, primary_key_name); + Alter_drop *ad= new Alter_drop(Alter_drop::KEY, primary_key_name, + FALSE); if (ad == NULL) MYSQL_YYABORT; lex->alter_info.drop_list.push_back(ad); lex->alter_info.flags|= ALTER_DROP_INDEX; } - | DROP key_or_index field_ident + | DROP key_or_index opt_if_exists field_ident { LEX *lex=Lex; - Alter_drop *ad= new Alter_drop(Alter_drop::KEY, $3.str); + Alter_drop *ad= new Alter_drop(Alter_drop::KEY, $4.str, $3); if (ad == NULL) MYSQL_YYABORT; lex->alter_info.drop_list.push_back(ad); @@ -10815,41 +10849,41 @@ do: */ drop: - DROP opt_temporary table_or_tables if_exists + DROP opt_temporary table_or_tables opt_if_exists { LEX *lex=Lex; lex->sql_command = SQLCOM_DROP_TABLE; lex->drop_temporary= $2; - lex->drop_if_exists= $4; + lex->check_exists= $4; YYPS->m_lock_type= TL_UNLOCK; YYPS->m_mdl_type= MDL_EXCLUSIVE; } table_list opt_restrict {} - | DROP INDEX_SYM ident ON table_ident {} + | DROP INDEX_SYM opt_if_exists ident ON table_ident {} { LEX *lex=Lex; - Alter_drop *ad= new Alter_drop(Alter_drop::KEY, $3.str); + Alter_drop *ad= new Alter_drop(Alter_drop::KEY, $4.str, $3); if (ad == NULL) MYSQL_YYABORT; lex->sql_command= SQLCOM_DROP_INDEX; lex->alter_info.reset(); lex->alter_info.flags= ALTER_DROP_INDEX; lex->alter_info.drop_list.push_back(ad); - if (!lex->current_select->add_table_to_list(lex->thd, $5, NULL, + if (!lex->current_select->add_table_to_list(lex->thd, $6, NULL, TL_OPTION_UPDATING, TL_READ_NO_INSERT, MDL_SHARED_NO_WRITE)) MYSQL_YYABORT; } - | DROP DATABASE if_exists ident + | DROP DATABASE opt_if_exists ident { LEX *lex=Lex; lex->sql_command= SQLCOM_DROP_DB; - lex->drop_if_exists=$3; + lex->check_exists=$3; lex->name= $4; } - | DROP FUNCTION_SYM if_exists ident '.' ident + | DROP FUNCTION_SYM opt_if_exists ident '.' ident { THD *thd= YYTHD; LEX *lex= thd->lex; @@ -10865,14 +10899,14 @@ drop: MYSQL_YYABORT; } lex->sql_command = SQLCOM_DROP_FUNCTION; - lex->drop_if_exists= $3; + lex->check_exists= $3; spname= new sp_name($4, $6, true); if (spname == NULL) MYSQL_YYABORT; spname->init_qname(thd); lex->spname= spname; } - | DROP FUNCTION_SYM if_exists ident + | DROP FUNCTION_SYM opt_if_exists ident { THD *thd= YYTHD; LEX *lex= thd->lex; @@ -10886,14 +10920,14 @@ drop: if (thd->db && lex->copy_db_to(&db.str, &db.length)) MYSQL_YYABORT; lex->sql_command = SQLCOM_DROP_FUNCTION; - lex->drop_if_exists= $3; + lex->check_exists= $3; spname= new sp_name(db, $4, false); if (spname == NULL) MYSQL_YYABORT; spname->init_qname(thd); lex->spname= spname; } - | DROP PROCEDURE_SYM if_exists sp_name + | DROP PROCEDURE_SYM opt_if_exists sp_name { LEX *lex=Lex; if (lex->sphead) @@ -10902,34 +10936,34 @@ drop: MYSQL_YYABORT; } lex->sql_command = SQLCOM_DROP_PROCEDURE; - lex->drop_if_exists= $3; + lex->check_exists= $3; lex->spname= $4; } | DROP USER clear_privileges user_list { Lex->sql_command = SQLCOM_DROP_USER; } - | DROP VIEW_SYM if_exists + | DROP VIEW_SYM opt_if_exists { LEX *lex= Lex; lex->sql_command= SQLCOM_DROP_VIEW; - lex->drop_if_exists= $3; + lex->check_exists= $3; YYPS->m_lock_type= TL_UNLOCK; YYPS->m_mdl_type= MDL_EXCLUSIVE; } table_list opt_restrict {} - | DROP EVENT_SYM if_exists sp_name + | DROP EVENT_SYM opt_if_exists sp_name { - Lex->drop_if_exists= $3; + Lex->check_exists= $3; Lex->spname= $4; Lex->sql_command = SQLCOM_DROP_EVENT; } - | DROP TRIGGER_SYM if_exists sp_name + | DROP TRIGGER_SYM opt_if_exists sp_name { LEX *lex= Lex; lex->sql_command= SQLCOM_DROP_TRIGGER; - lex->drop_if_exists= $3; + lex->check_exists= $3; lex->spname= $4; } | DROP TABLESPACE tablespace_name opt_ts_engine opt_ts_wait @@ -10942,10 +10976,10 @@ drop: LEX *lex= Lex; lex->alter_tablespace_info->ts_cmd_type= DROP_LOGFILE_GROUP; } - | DROP SERVER_SYM if_exists ident_or_text + | DROP SERVER_SYM opt_if_exists ident_or_text { Lex->sql_command = SQLCOM_DROP_SERVER; - Lex->drop_if_exists= $3; + Lex->check_exists= $3; Lex->server_options.server_name= $4.str; Lex->server_options.server_name_length= $4.length; } @@ -10983,9 +11017,17 @@ table_alias_ref: } ; -if_exists: - /* empty */ { $$= 0; } - | IF EXISTS { $$= 1; } +opt_if_exists: + /* empty */ + { + Lex->check_exists= FALSE; + $$= 0; + } + | IF EXISTS + { + Lex->check_exists= TRUE; + $$= 1; + } ; opt_temporary: @@ -13173,6 +13215,7 @@ keyword_sp: | AUTHORS_SYM {} | AUTO_INC {} | AUTOEXTEND_SIZE_SYM {} + | AUTO_SYM {} | AVG_ROW_LENGTH {} | AVG_SYM {} | BINLOG_SYM {} @@ -13283,6 +13326,7 @@ keyword_sp: | MAX_ROWS {} | MASTER_SYM {} | MASTER_HEARTBEAT_PERIOD_SYM {} + | MASTER_USE_GTID_SYM {} | MASTER_HOST_SYM {} | MASTER_PORT_SYM {} | MASTER_LOG_FILE_SYM {} diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 4d0972813f0..762b35da89a 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -55,6 +55,7 @@ #include "../storage/perfschema/pfs_server.h" #endif /* WITH_PERFSCHEMA_STORAGE_ENGINE */ #include "threadpool.h" +#include "sql_repl.h" /* The rule for this file: everything should be 'static'. When a sys_var @@ -1092,7 +1093,7 @@ static Sys_var_ulonglong Sys_max_binlog_cache_size( "Sets the total size of the transactional cache", GLOBAL_VAR(max_binlog_cache_size), CMD_LINE(REQUIRED_ARG), VALID_RANGE(IO_SIZE, ULONGLONG_MAX), - DEFAULT((UINT_MAX/IO_SIZE)*IO_SIZE), + DEFAULT((ULONGLONG_MAX/IO_SIZE)*IO_SIZE), BLOCK_SIZE(IO_SIZE)); static Sys_var_ulonglong Sys_max_binlog_stmt_cache_size( @@ -1100,7 +1101,7 @@ static Sys_var_ulonglong Sys_max_binlog_stmt_cache_size( "Sets the total size of the statement cache", GLOBAL_VAR(max_binlog_stmt_cache_size), CMD_LINE(REQUIRED_ARG), VALID_RANGE(IO_SIZE, ULONGLONG_MAX), - DEFAULT((UINT_MAX/IO_SIZE)*IO_SIZE), + DEFAULT((ULONGLONG_MAX/IO_SIZE)*IO_SIZE), BLOCK_SIZE(IO_SIZE)); static bool fix_max_binlog_size(sys_var *self, THD *thd, enum_var_type type) @@ -1201,6 +1202,114 @@ static Sys_var_ulong Sys_pseudo_thread_id( BLOCK_SIZE(1), NO_MUTEX_GUARD, IN_BINLOG, ON_CHECK(check_has_super)); +static Sys_var_uint Sys_gtid_domain_id( + "gtid_domain_id", + "Used with global transaction ID to identify logically independent " + "replication streams. When events can propagate through multiple " + "parallel paths (for example multiple masters), each independent " + "source server must use a distinct domain_id. For simple tree-shaped " + "replication topologies, it can be left at its default, 0.", + SESSION_VAR(gtid_domain_id), + CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, UINT_MAX32), DEFAULT(0), + BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(check_has_super)); + +static Sys_var_ulonglong Sys_gtid_seq_no( + "gtid_seq_no", + "Internal server usage, for replication with global transaction id. " + "When set, next event group logged to the binary log will use this " + "sequence number, not generate a new one, thus allowing to preserve " + "master's GTID in slave's binlog.", + SESSION_ONLY(gtid_seq_no), + NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0), + BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, + ON_CHECK(check_has_super)); + + +#ifdef HAVE_REPLICATION +bool +Sys_var_gtid_pos::do_check(THD *thd, set_var *var) +{ + String str, *res; + bool running; + + DBUG_ASSERT(var->type == OPT_GLOBAL); + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + if (running) + return true; + if (!(res= var->value->val_str(&str))) + return true; + if (rpl_gtid_pos_check(&((*res)[0]), res->length())) + return true; + + if (!(var->save_result.string_value.str= + thd->strmake(res->ptr(), res->length()))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return true; + } + var->save_result.string_value.length= res->length(); + return false; +} + + +bool +Sys_var_gtid_pos::global_update(THD *thd, set_var *var) +{ + bool err; + + DBUG_ASSERT(var->type == OPT_GLOBAL); + + if (!var->value) + { + my_error(ER_NO_DEFAULT, MYF(0), var->var->name.str); + return true; + } + + mysql_mutex_unlock(&LOCK_global_system_variables); + mysql_mutex_lock(&LOCK_active_mi); + if (master_info_index->give_error_if_slave_running()) + err= true; + else + err= rpl_gtid_pos_update(thd, var->save_result.string_value.str, + var->save_result.string_value.length); + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_global_system_variables); + return err; +} + + +uchar * +Sys_var_gtid_pos::global_value_ptr(THD *thd, LEX_STRING *base) +{ + String str; + char *p; + + str.length(0); + if (rpl_append_gtid_state(&str, true) || + !(p= thd->strmake(str.ptr(), str.length()))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return NULL; + } + + return (uchar *)p; +} + + +static unsigned char opt_gtid_pos_dummy; +static Sys_var_gtid_pos Sys_gtid_pos( + "gtid_pos", + "The list of global transaction IDs that were last replicated on the " + "server, one for each replication domain. This defines where a slave " + "starts replicating from on a master when connecting with global " + "transaction ID.", + GLOBAL_VAR(opt_gtid_pos_dummy), NO_CMD_LINE); +#endif + + static bool fix_max_join_size(sys_var *self, THD *thd, enum_var_type type) { SV *sv= type == OPT_GLOBAL ? &global_system_variables : &thd->variables; @@ -1985,17 +2094,27 @@ static Sys_var_charptr Sys_secure_file_priv( static bool fix_server_id(sys_var *self, THD *thd, enum_var_type type) { - server_id_supplied = 1; - thd->server_id= server_id; + if (type == OPT_GLOBAL) + { + server_id_supplied = 1; + thd->variables.server_id= global_system_variables.server_id; + /* + Historically, server_id was a global variable that is exported to + plugins. Now it is a session variable, and lives in the + global_system_variables struct, but we still need to export the + value for reading to plugins for backwards compatibility reasons. + */ + ::server_id= global_system_variables.server_id; + } return false; } static Sys_var_ulong Sys_server_id( "server_id", "Uniquely identifies the server instance in the community of " "replication partners", - GLOBAL_VAR(server_id), CMD_LINE(REQUIRED_ARG, OPT_SERVER_ID), + SESSION_VAR(server_id), CMD_LINE(REQUIRED_ARG, OPT_SERVER_ID), VALID_RANGE(0, UINT_MAX32), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, - NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_server_id)); + NOT_IN_BINLOG, ON_CHECK(check_has_super), ON_UPDATE(fix_server_id)); static Sys_var_mybool Sys_slave_compressed_protocol( "slave_compressed_protocol", @@ -3386,6 +3505,7 @@ get_master_info_uint_value(THD *thd, ptrdiff_t offset) { Master_info *mi; uint res= 0; // Default value + mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_active_mi); mi= master_info_index-> get_master_info(&thd->variables.default_master_connection, @@ -3397,6 +3517,7 @@ get_master_info_uint_value(THD *thd, ptrdiff_t offset) mysql_mutex_unlock(&mi->rli.data_lock); } mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_global_system_variables); return res; } @@ -3408,6 +3529,8 @@ bool update_multi_source_variable(sys_var *self_var, THD *thd, bool result= true; Master_info *mi; + if (type == OPT_GLOBAL) + mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_active_mi); mi= master_info_index-> get_master_info(&thd->variables.default_master_connection, @@ -3421,6 +3544,8 @@ bool update_multi_source_variable(sys_var *self_var, THD *thd, mysql_mutex_unlock(&mi->rli.run_lock); } mysql_mutex_unlock(&LOCK_active_mi); + if (type == OPT_GLOBAL) + mysql_mutex_lock(&LOCK_global_system_variables); return result; } diff --git a/sql/sys_vars.h b/sql/sys_vars.h index 31764aa82a4..b04e3817406 100644 --- a/sql/sys_vars.h +++ b/sql/sys_vars.h @@ -2020,3 +2020,43 @@ public: } }; + +/** + Class for @@global.gtid_pos. +*/ +class Sys_var_gtid_pos: public sys_var +{ +public: + Sys_var_gtid_pos(const char *name_arg, + const char *comment, int flag_args, ptrdiff_t off, size_t size, + CMD_LINE getopt) + : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id, + getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG, + NULL, NULL, NULL) + { + option.var_type= GET_STR; + } + bool do_check(THD *thd, set_var *var); + bool session_update(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + return true; + } + bool global_update(THD *thd, set_var *var); + bool check_update_type(Item_result type) { return type != STRING_RESULT; } + void session_save_default(THD *thd, set_var *var) + { + DBUG_ASSERT(false); + } + void global_save_default(THD *thd, set_var *var) + { + /* Record the attempt to use default so we can error. */ + var->value= 0; + } + uchar *session_value_ptr(THD *thd, LEX_STRING *base) + { + DBUG_ASSERT(false); + return NULL; + } + uchar *global_value_ptr(THD *thd, LEX_STRING *base); +}; diff --git a/sql/table.cc b/sql/table.cc index ce343ef58db..e88b3453ce9 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -3460,9 +3460,9 @@ bool check_column_name(const char *name) } #else last_char_is_space= *name==' '; -#endif - if (*name == NAMES_SEP_CHAR) + if (*name == '\377') return 1; +#endif name++; name_length++; } @@ -3620,6 +3620,46 @@ Table_check_intact::check(TABLE *table, const TABLE_FIELD_DEF *table_def) } } + if (table_def->primary_key_parts) + { + if (table->s->primary_key == MAX_KEY) + { + report_error(0, "Incorrect definition of table %s.%s: " + "missing primary key.", table->s->db.str, + table->alias.c_ptr()); + error= TRUE; + } + else + { + KEY *pk= &table->s->key_info[table->s->primary_key]; + if (pk->key_parts != table_def->primary_key_parts) + { + report_error(0, "Incorrect definition of table %s.%s: " + "Expected primary key to have %u columns, but instead " + "found %u columns.", table->s->db.str, + table->alias.c_ptr(), table_def->primary_key_parts, + pk->key_parts); + error= TRUE; + } + else + { + for (i= 0; i < pk->key_parts; ++i) + { + if (table_def->primary_key_columns[i] + 1 != pk->key_part[i].fieldnr) + { + report_error(0, "Incorrect definition of table %s.%s: Expected " + "primary key part %u to refer to column %u, but " + "instead found column %u.", table->s->db.str, + table->alias.c_ptr(), i + 1, + table_def->primary_key_columns[i] + 1, + pk->key_part[i].fieldnr); + error= TRUE; + } + } + } + } + } + if (! error) table->s->table_field_def_cache= table_def; diff --git a/sql/table.h b/sql/table.h index eae0cbc5596..e721d60f892 100644 --- a/sql/table.h +++ b/sql/table.h @@ -493,6 +493,8 @@ typedef struct st_table_field_def { uint count; const TABLE_FIELD_TYPE *field; + uint primary_key_parts; + const uint *primary_key_columns; } TABLE_FIELD_DEF; |