diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-05-28 13:28:31 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-05-28 13:28:31 +0200 |
commit | ee2b7db3f88f6882022a8aa71b30043ed8b40792 (patch) | |
tree | ce63742ca2896e0bd73a702efeb9eb40ed784f04 /sql | |
parent | f5319394e3dee146f112b41674ffa188d5e89150 (diff) | |
download | mariadb-git-ee2b7db3f88f6882022a8aa71b30043ed8b40792.tar.gz |
MDEV-4478: Implement GTID "strict mode"
When @@GLOBAL.gtid_strict_mode=1, then certain operations result
in error that would otherwise result in out-of-order binlog files
between servers.
GTID sequence numbers are now allocated independently per domain;
this results in less/no holes in GTID sequences, increasing the
likelyhood that diverging binlogs will be caught by the slave when
GTID strict mode is enabled.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log.cc | 99 | ||||
-rw-r--r-- | sql/log.h | 4 | ||||
-rw-r--r-- | sql/log_event.cc | 9 | ||||
-rw-r--r-- | sql/mysqld.cc | 13 | ||||
-rw-r--r-- | sql/mysqld.h | 5 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 238 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 13 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 12 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 4 | ||||
-rw-r--r-- | sql/slave.cc | 59 | ||||
-rw-r--r-- | sql/sql_repl.cc | 116 | ||||
-rw-r--r-- | sql/sys_vars.cc | 31 |
12 files changed, 452 insertions, 151 deletions
diff --git a/sql/log.cc b/sql/log.cc index 238452070ff..c3ea089af69 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -3841,9 +3841,6 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log) if (!is_relay_log) { rpl_global_gtid_binlog_state.reset(); - mysql_mutex_lock(&LOCK_gtid_counter); - global_gtid_counter= 0; - mysql_mutex_unlock(&LOCK_gtid_counter); } /* Start logging with a new file */ @@ -5367,9 +5364,11 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, bool is_transactional) { rpl_gtid gtid; - uint64 seq_no; + uint32 domain_id= thd->variables.gtid_domain_id; + uint32 server_id= thd->variables.server_id; + uint64 seq_no= thd->variables.gtid_seq_no; + int err; - seq_no= thd->variables.gtid_seq_no; /* Reset the session variable gtid_seq_no, to reduce the risk of accidentally producing a duplicate GTID. @@ -5377,34 +5376,36 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, thd->variables.gtid_seq_no= 0; if (seq_no != 0) { - /* - If we see a higher sequence number, use that one as the basis of any - later generated sequence numbers. - */ - bump_seq_no_counter_if_needed(seq_no); + /* Use the specified sequence number. */ + gtid.domain_id= domain_id; + gtid.server_id= server_id; + gtid.seq_no= seq_no; + mysql_mutex_lock(&LOCK_rpl_gtid_state); + err= rpl_global_gtid_binlog_state.update(>id, opt_gtid_strict_mode); + mysql_mutex_unlock(&LOCK_rpl_gtid_state); + if (err && thd->stmt_da->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER) + errno= ER_GTID_STRICT_OUT_OF_ORDER; } else { - mysql_mutex_lock(&LOCK_gtid_counter); - seq_no= ++global_gtid_counter; - mysql_mutex_unlock(&LOCK_gtid_counter); + /* Allocate the next sequence number for the GTID. */ + mysql_mutex_lock(&LOCK_rpl_gtid_state); + err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id, + server_id, >id); + mysql_mutex_unlock(&LOCK_rpl_gtid_state); + seq_no= gtid.seq_no; } - gtid.seq_no= seq_no; - gtid.domain_id= thd->variables.gtid_domain_id; + if (err) + return true; - Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone, + Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, LOG_EVENT_SUPPRESS_USE_F, is_transactional); - gtid.server_id= gtid_event.server_id; /* Write the event to the binary log. */ if (gtid_event.write(&mysql_bin_log.log_file)) return true; status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); - /* Update the replication state (last GTID in each replication domain). */ - mysql_mutex_lock(&LOCK_rpl_gtid_state); - rpl_global_gtid_binlog_state.update(>id); - mysql_mutex_unlock(&LOCK_rpl_gtid_state); return false; } @@ -5505,9 +5506,6 @@ end: end_io_cache(&cache); if (opened) mysql_file_close(file_no, MYF(0)); - /* Pick the next unused seq_no from the loaded binlog state. */ - bump_seq_no_counter_if_needed( - rpl_global_gtid_binlog_state.seq_no_from_state()); return err; } @@ -5549,33 +5547,44 @@ bool MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id, rpl_gtid *out_gtid) { - rpl_binlog_state::element *elem; - bool res; + rpl_gtid *found_gtid; + bool res= false; mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); - elem= (rpl_binlog_state::element *) - my_hash_search(&rpl_global_gtid_binlog_state.hash, - (const uchar *)&domain_id, 0); - if (elem) + if ((found_gtid= rpl_global_gtid_binlog_state.find_most_recent(domain_id))) { + *out_gtid= *found_gtid; res= true; - *out_gtid= *elem->last_gtid; } - else - res= false; mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); return res; } -void -MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no) +int +MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no) +{ + int err; + + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + err= rpl_global_gtid_binlog_state.bump_seq_no_if_needed(domain_id, seq_no); + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + return err; +} + + +bool +MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no) { - mysql_mutex_lock(&LOCK_gtid_counter); - if (global_gtid_counter < seq_no) - global_gtid_counter= seq_no; - mysql_mutex_unlock(&LOCK_gtid_counter); + bool err; + + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + err= rpl_global_gtid_binlog_state.check_strict_sequence(domain_id, server_id, + seq_no); + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + return err; } @@ -5648,7 +5657,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); prev_binlog_id= current_binlog_id; - write_gtid_event(thd, true, using_trans); + if (write_gtid_event(thd, true, using_trans)) + goto err; } else { @@ -6717,8 +6727,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); - current->error= write_transaction_or_stmt(current); - + if ((current->error= write_transaction_or_stmt(current))) + current->commit_errno= errno; strmake(cache_mngr->last_commit_pos_file, log_file_name, sizeof(cache_mngr->last_commit_pos_file)-1); commit_offset= my_b_write_tell(&log_file); @@ -8264,9 +8274,6 @@ int TC_LOG_BINLOG::open(const char *opt_name) error= recover(&log_info, log_name, &log, (Format_description_log_event *)ev); state_read= true; - /* Pick the next unused seq_no from the recovered binlog state. */ - bump_seq_no_counter_if_needed( - rpl_global_gtid_binlog_state.seq_no_from_state()); } else error= read_state_from_file(); @@ -8748,7 +8755,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, gtid.domain_id= gev->domain_id; gtid.server_id= gev->server_id; gtid.seq_no= gev->seq_no; - if (rpl_global_gtid_binlog_state.update(>id)) + if (rpl_global_gtid_binlog_state.update(>id, false)) goto err2; } break; diff --git a/sql/log.h b/sql/log.h index f41b48eba7c..018ac64eff7 100644 --- a/sql/log.h +++ b/sql/log.h @@ -783,7 +783,9 @@ public: bool find_in_binlog_state(uint32 domain_id, uint32 server_id, rpl_gtid *out_gtid); bool lookup_domain_in_binlog_state(uint32 domain_id, rpl_gtid *out_gtid); - void bump_seq_no_counter_if_needed(uint64 seq_no); + int bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no); + bool check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no); }; class Log_event_handler diff --git a/sql/log_event.cc b/sql/log_event.cc index f4e34dd9224..0a21ae6d279 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6214,6 +6214,15 @@ Gtid_log_event::do_apply_event(Relay_log_info const *rli) thd->variables.gtid_domain_id= this->domain_id; thd->variables.gtid_seq_no= this->seq_no; + if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates) + { + /* Need to reset prior "ok" status to give an error. */ + thd->clear_error(); + thd->stmt_da->reset_diagnostics_area(); + if (mysql_bin_log.check_strict_gtid_sequence(this->domain_id, + this->server_id, this->seq_no)) + return 1; + } if (flags2 & FL_STANDALONE) return 0; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 963f259f486..1102a527546 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -677,7 +677,7 @@ mysql_mutex_t mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, LOCK_global_table_stats, LOCK_global_index_stats; -mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state; +mysql_mutex_t LOCK_rpl_gtid_state; /** The below lock protects access to two global server variables: @@ -776,7 +776,7 @@ PSI_mutex_key key_LOCK_stats, key_LOCK_global_index_stats, key_LOCK_wakeup_ready; -PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state; +PSI_mutex_key key_LOCK_rpl_gtid_state; PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered; @@ -821,7 +821,6 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0}, - { &key_LOCK_gtid_counter, "LOCK_gtid_counter", PSI_FLAG_GLOBAL}, { &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL}, { &key_LOCK_thd_data, "THD::LOCK_thd_data", 0}, { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL}, @@ -1294,10 +1293,7 @@ struct st_VioSSLFd *ssl_acceptor_fd; */ uint connection_count= 0, extra_connection_count= 0; -/** - Running counter for generating new GTIDs locally. -*/ -uint64 global_gtid_counter= 0; +my_bool opt_gtid_strict_mode= FALSE; /* Function declarations */ @@ -1977,7 +1973,6 @@ static void clean_up_mutexes() mysql_mutex_destroy(&LOCK_global_user_client_stats); mysql_mutex_destroy(&LOCK_global_table_stats); mysql_mutex_destroy(&LOCK_global_index_stats); - mysql_mutex_destroy(&LOCK_gtid_counter); mysql_mutex_destroy(&LOCK_rpl_gtid_state); #ifdef HAVE_OPENSSL mysql_mutex_destroy(&LOCK_des_key_file); @@ -4089,8 +4084,6 @@ static int init_thread_environment() &LOCK_global_table_stats, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_global_index_stats, &LOCK_global_index_stats, MY_MUTEX_INIT_FAST); - mysql_mutex_init(key_LOCK_gtid_counter, - &LOCK_gtid_counter, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_rpl_gtid_state, &LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW); mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered, diff --git a/sql/mysqld.h b/sql/mysqld.h index c4b62c84603..9c9bf71fec8 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -253,7 +253,7 @@ extern PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_index_stats, key_LOCK_wakeup_ready; -extern PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state; +extern PSI_mutex_key key_LOCK_rpl_gtid_state; extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, @@ -345,7 +345,7 @@ extern mysql_mutex_t LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_global_system_variables, LOCK_user_conn, LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count; -extern mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state; +extern mysql_mutex_t LOCK_rpl_gtid_state; extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count; #ifdef HAVE_OPENSSL extern mysql_mutex_t LOCK_des_key_file; @@ -541,6 +541,7 @@ extern handlerton *maria_hton; extern uint extra_connection_count; extern uint64 global_gtid_counter; +extern my_bool opt_gtid_strict_mode; extern my_bool opt_userstat_running, debug_assert_if_crashed_table; extern uint mysqld_extra_port; extern ulong opt_progress_report_time; diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 01502c5b0f1..b34b890060b 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -370,7 +370,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, } table->file->ha_index_end(); - mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no); + if(!err && opt_bin_log && + (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id, + gtid->seq_no))) + my_error(ER_OUT_OF_RESOURCES, MYF(0)); end: @@ -719,7 +722,7 @@ rpl_binlog_state::load(struct rpl_gtid *list, uint32 count) reset(); for (i= 0; i < count; ++i) { - if (update(&(list[i]))) + if (update(&(list[i]), false)) return true; } return false; @@ -741,48 +744,111 @@ rpl_binlog_state::~rpl_binlog_state() Returns 0 for ok, 1 for error. */ int -rpl_binlog_state::update(const struct rpl_gtid *gtid) +rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict) { - rpl_gtid *lookup_gtid; element *elem; - elem= (element *)my_hash_search(&hash, (const uchar *)(>id->domain_id), 0); - if (elem) + if ((elem= (element *)my_hash_search(&hash, + (const uchar *)(>id->domain_id), 0))) { - /* - By far the most common case is that successive events within same - replication domain have the same server id (it changes only when - switching to a new master). So save a hash lookup in this case. - */ - if (likely(elem->last_gtid->server_id == gtid->server_id)) + if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no) { - elem->last_gtid->seq_no= gtid->seq_no; - return 0; + my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id, + gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id, + elem->last_gtid->server_id, elem->last_gtid->seq_no); + return 1; } + if (elem->seq_no_counter < gtid->seq_no) + elem->seq_no_counter= gtid->seq_no; + if (!elem->update_element(gtid)) + return 0; + } + else if (!alloc_element(gtid)) + return 0; - lookup_gtid= (rpl_gtid *) - my_hash_search(&elem->hash, (const uchar *)>id->server_id, 0); - if (lookup_gtid) - { - lookup_gtid->seq_no= gtid->seq_no; - elem->last_gtid= lookup_gtid; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; +} + + +/* + Fill in a new GTID, allocating next sequence number, and update state + accordingly. +*/ +int +rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id, + rpl_gtid *gtid) +{ + element *elem; + + gtid->domain_id= domain_id; + gtid->server_id= server_id; + + if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0))) + { + gtid->seq_no= ++elem->seq_no_counter; + if (!elem->update_element(gtid)) return 0; - } + } + else + { + gtid->seq_no= 1; + if (!alloc_element(gtid)) + return 0; + } - /* Allocate a new GTID and insert it. */ - lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); - if (!lookup_gtid) - return 1; - memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); - if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) - { - my_free(lookup_gtid); - return 1; - } - elem->last_gtid= lookup_gtid; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; +} + + +/* Helper functions for update. */ +int +rpl_binlog_state::element::update_element(const rpl_gtid *gtid) +{ + rpl_gtid *lookup_gtid; + + /* + By far the most common case is that successive events within same + replication domain have the same server id (it changes only when + switching to a new master). So save a hash lookup in this case. + */ + if (likely(last_gtid && last_gtid->server_id == gtid->server_id)) + { + last_gtid->seq_no= gtid->seq_no; + return 0; + } + + lookup_gtid= (rpl_gtid *) + my_hash_search(&hash, (const uchar *)>id->server_id, 0); + if (lookup_gtid) + { + lookup_gtid->seq_no= gtid->seq_no; + last_gtid= lookup_gtid; return 0; } + /* Allocate a new GTID and insert it. */ + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (!lookup_gtid) + return 1; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (my_hash_insert(&hash, (const uchar *)lookup_gtid)) + { + my_free(lookup_gtid); + return 1; + } + last_gtid= lookup_gtid; + return 0; +} + + +int +rpl_binlog_state::alloc_element(const rpl_gtid *gtid) +{ + element *elem; + rpl_gtid *lookup_gtid; + /* First time we see this domain_id; allocate a new element. */ elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)); lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); @@ -793,6 +859,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid) offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, HASH_UNIQUE); elem->last_gtid= lookup_gtid; + elem->seq_no_counter= gtid->seq_no; memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) { @@ -812,23 +879,64 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid) } -uint64 -rpl_binlog_state::seq_no_from_state() +/* + Check that a new GTID can be logged without creating an out-of-order + sequence number with existing GTIDs. +*/ +bool +rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no) { - ulong i, j; - uint64 seq_no= 0; + element *elem; - for (i= 0; i < hash.records; ++i) + if ((elem= (element *)my_hash_search(&hash, + (const uchar *)(&domain_id), 0)) && + elem->last_gtid && elem->last_gtid->seq_no >= seq_no) { - element *e= (element *)my_hash_element(&hash, i); - for (j= 0; j < e->hash.records; ++j) - { - const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); - if (gtid->seq_no > seq_no) - seq_no= gtid->seq_no; - } + my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no, + elem->last_gtid->domain_id, elem->last_gtid->server_id, + elem->last_gtid->seq_no); + return 1; } - return seq_no; + return 0; +} + + +/* + When we see a new GTID that will not be binlogged (eg. slave thread + with --log-slave-updates=0), then we need to remember to allocate any + GTID seq_no of our own within that domain starting from there. + + Returns 0 if ok, non-zero if out-of-memory. +*/ +int +rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no) +{ + element *elem; + + if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0))) + { + if (elem->seq_no_counter < seq_no) + elem->seq_no_counter= seq_no; + return 0; + } + + /* We need to allocate a new, empty element to remember the next seq_no. */ + if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)))) + return 1; + + elem->domain_id= domain_id; + my_hash_init(&elem->hash, &my_charset_bin, 32, + offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); + elem->last_gtid= NULL; + elem->seq_no_counter= seq_no; + if (0 == my_hash_insert(&hash, (const uchar *)elem)) + return 0; + + my_hash_free(&elem->hash); + my_free(elem); + return 1; } @@ -849,6 +957,11 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest) { size_t res; element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records == 0); + continue; + } for (j= 0; j <= e->hash.records; ++j) { const rpl_gtid *gtid; @@ -890,7 +1003,7 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src) end= buf + res; if (gtid_parser_helper(&p, end, >id)) return 1; - if (update(>id)) + if (update(>id, false)) return 1; } return 0; @@ -906,6 +1019,17 @@ rpl_binlog_state::find(uint32 domain_id, uint32 server_id) return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0); } +rpl_gtid * +rpl_binlog_state::find_most_recent(uint32 domain_id) +{ + element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); + if (elem && elem->last_gtid) + return elem->last_gtid; + return NULL; +} + uint32 rpl_binlog_state::count() @@ -929,6 +1053,11 @@ rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) for (i= 0; i < hash.records; ++i) { element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records==0); + continue; + } for (j= 0; j <= e->hash.records; ++j) { const rpl_gtid *gtid; @@ -965,16 +1094,22 @@ int rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) { uint32 i; + uint32 alloc_size, out_size; - *size= hash.records; - if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME)))) + alloc_size= hash.records; + if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid), + MYF(MY_WME)))) return 1; - for (i= 0; i < *size; ++i) + out_size= 0; + for (i= 0; i < alloc_size; ++i) { element *e= (element *)my_hash_element(&hash, i); - memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid)); + if (!e->last_gtid) + continue; + memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid)); } + *size= out_size; return 0; } @@ -988,7 +1123,8 @@ rpl_binlog_state::append_pos(String *str) for (i= 0; i < hash.records; ++i) { element *e= (element *)my_hash_element(&hash, i); - if (rpl_slave_state_tostring_helper(str, e->last_gtid, &first)) + if (e->last_gtid && + rpl_slave_state_tostring_helper(str, e->last_gtid, &first)) return true; } diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index cc3c99f40b7..fefce684c2c 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -131,6 +131,10 @@ struct rpl_binlog_state HASH hash; /* Containing all server_id for one domain_id */ /* The most recent entry in the hash. */ rpl_gtid *last_gtid; + /* Counter to allocate next seq_no for this domain. */ + uint64 seq_no_counter; + + int update_element(const rpl_gtid *gtid); }; /* Mapping from domain_id to collection of elements. */ HASH hash; @@ -144,8 +148,12 @@ struct rpl_binlog_state void reset(); void free(); bool load(struct rpl_gtid *list, uint32 count); - int update(const struct rpl_gtid *gtid); - uint64 seq_no_from_state(); + int update(const struct rpl_gtid *gtid, bool strict); + int update_with_next_gtid(uint32 domain_id, uint32 server_id, + rpl_gtid *gtid); + int alloc_element(const rpl_gtid *gtid); + bool check_strict_sequence(uint32 domain_id, uint32 server_id, uint64 seq_no); + int bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no); int write_to_iocache(IO_CACHE *dest); int read_from_iocache(IO_CACHE *src); uint32 count(); @@ -153,6 +161,7 @@ struct rpl_binlog_state int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); bool append_pos(String *str); rpl_gtid *find(uint32 domain_id, uint32 server_id); + rpl_gtid *find_most_recent(uint32 domain_id); }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index d27a80313ac..03ec77e1433 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1397,7 +1397,6 @@ rpl_load_gtid_slave_state(THD *thd) HASH hash; int err= 0; uint32 i; - uint64 highest_seq_no= 0; DBUG_ENTER("rpl_load_gtid_slave_state"); rpl_global_gtid_slave_state.lock(); @@ -1450,8 +1449,6 @@ rpl_load_gtid_slave_state(THD *thd) DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n", (unsigned)domain_id, (unsigned)server_id, (ulong)seq_no, (ulong)sub_id)); - if (seq_no > highest_seq_no) - highest_seq_no= seq_no; if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0))) { @@ -1495,6 +1492,14 @@ rpl_load_gtid_slave_state(THD *thd) rpl_global_gtid_slave_state.unlock(); goto end; } + if (opt_bin_log && + mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id, + entry->gtid.seq_no)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + rpl_global_gtid_slave_state.unlock(); + goto end; + } } rpl_global_gtid_slave_state.loaded= true; rpl_global_gtid_slave_state.unlock(); @@ -1514,7 +1519,6 @@ end: thd->mdl_context.release_transactional_locks(); } my_hash_free(&hash); - mysql_bin_log.bump_seq_no_counter_if_needed(highest_seq_no); DBUG_RETURN(err); } diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 539e90d7cb5..f08ba26fa4d 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6545,3 +6545,7 @@ ER_MASTER_GTID_POS_MISSING_DOMAIN eng "Specified value for @@gtid_slave_pos contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. If MASTER_GTID_POS=CURRENT_POS is used, the binlog position will override the new value of @@gtid_slave_pos." ER_UNTIL_REQUIRES_USING_GTID eng "START SLAVE UNTIL master_gtid_pos requires that slave is using GTID" +ER_GTID_STRICT_OUT_OF_ORDER + eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled." +ER_GTID_START_FROM_BINLOG_HOLE + eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled" diff --git a/sql/slave.cc b/sql/slave.cc index bf7e81edf71..10411f9a5a5 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1824,8 +1824,8 @@ after_set_capability: { int rc; char str_buf[256]; - String connect_state(str_buf, sizeof(str_buf), system_charset_info); - connect_state.length(0); + String query_str(str_buf, sizeof(str_buf), system_charset_info); + query_str.length(0); /* Read the master @@GLOBAL.gtid_domain_id variable. @@ -1848,9 +1848,9 @@ after_set_capability: mysql_free_result(master_res); master_res= NULL; - connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"), - system_charset_info); - if (rpl_append_gtid_state(&connect_state, + query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + if (rpl_append_gtid_state(&query_str, mi->using_gtid == Master_info::USE_GTID_CURRENT_POS)) { @@ -1860,9 +1860,9 @@ after_set_capability: sprintf(err_buff, "%s Error: Out of memory", errmsg); goto err; } - connect_state.append(STRING_WITH_LEN("'"), system_charset_info); + query_str.append(STRING_WITH_LEN("'"), system_charset_info); - rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); if (rc) { err_code= mysql_errno(mysql); @@ -1883,12 +1883,45 @@ after_set_capability: } } + query_str.length(0); + if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_strict_mode="), + system_charset_info) || + query_str.append_ulonglong(opt_gtid_strict_mode != false)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_gtid_strict_mode failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_gtid_strict_mode."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID) { - connect_state.length(0); - connect_state.append(STRING_WITH_LEN("SET @slave_until_gtid='"), - system_charset_info); - if (mi->rli.until_gtid_pos.append_to_string(&connect_state)) + query_str.length(0); + query_str.append(STRING_WITH_LEN("SET @slave_until_gtid='"), + system_charset_info); + if (mi->rli.until_gtid_pos.append_to_string(&query_str)) { err_code= ER_OUTOFMEMORY; errmsg= "The slave I/O thread stops because a fatal out-of-memory " @@ -1896,9 +1929,9 @@ after_set_capability: sprintf(err_buff, "%s Error: Out of memory", errmsg); goto err; } - connect_state.append(STRING_WITH_LEN("'"), system_charset_info); + query_str.append(STRING_WITH_LEN("'"), system_charset_info); - rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); if (rc) { err_code= mysql_errno(mysql); diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 82f471d5ddf..5ed73bf81f3 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -614,6 +614,19 @@ get_slave_connect_state(THD *thd, String *out_str) } +static bool +get_slave_gtid_strict_mode(THD *thd) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_strict_mode") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_int(&null_value) && !null_value; +} + + /* Get the value of the @slave_until_gtid user variable into the supplied String (this is the GTID position specified for START SLAVE UNTIL @@ -876,8 +889,6 @@ contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev) Give an error if the slave requests something that we do not have in our binlog. - - T */ static int @@ -1465,13 +1476,13 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, enum_gtid_skip_type *gtid_skip_group, slave_connection_state *until_gtid_state, enum_gtid_until_state *gtid_until_group, - rpl_binlog_state *until_binlog_state) + rpl_binlog_state *until_binlog_state, + bool slave_gtid_strict_mode, rpl_gtid *error_gtid) { my_off_t pos; size_t len= packet->length(); - if (event_type == GTID_LIST_EVENT && using_gtid_state && - (gtid_state->count() > 0 || until_gtid_state)) + if (event_type == GTID_LIST_EVENT && using_gtid_state && until_gtid_state) { rpl_gtid *gtid_list; uint32 list_len; @@ -1481,11 +1492,17 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, current_checksum_alg, >id_list, &list_len)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to read Gtid_list_log_event: corrupt binlog"; + } err= until_binlog_state->load(gtid_list, list_len); my_free(gtid_list); if (err) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed in internal GTID book-keeping: Out of memory"; + } } /* Skip GTID event groups until we reach slave position within a domain_id. */ @@ -1503,10 +1520,16 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, current_checksum_alg, &event_gtid.domain_id, &event_gtid.server_id, &event_gtid.seq_no, &flags2)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to read Gtid_log_event: corrupt binlog"; + } - if (until_binlog_state->update(&event_gtid)) + if (until_binlog_state->update(&event_gtid, false)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed in internal GTID book-keeping: Out of memory"; + } if (gtid_state->count() > 0) { @@ -1518,13 +1541,30 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, event_gtid.seq_no <= gtid->seq_no) *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); - /* - Delete this entry if we have reached slave start position (so we will - not skip subsequent events and won't have to look them up and check). - */ if (event_gtid.server_id == gtid->server_id && event_gtid.seq_no >= gtid->seq_no) + { + /* + In strict mode, it is an error if the slave requests to start in + a "hole" in the master's binlog: a GTID that does not exist, even + though both the prior and subsequent seq_no exists for same + domain_id and server_id. + */ + if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no) + { + my_errno= ER_GTID_START_FROM_BINLOG_HOLE; + *error_gtid= *gtid; + return "The binlog on the master is missing the GTID requested " + "by the slave (even though both a prior and a subsequent " + "sequence number does exist), and GTID strict mode is enabled."; + } + /* + Delete this entry if we have reached slave start position (so we + will not skip subsequent events and won't have to look them up + and check). + */ gtid_state->remove(gtid); + } } } @@ -1626,7 +1666,10 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, a no-operation on the slave. */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace row annotate event with dummy: too small event."; + } } } @@ -1645,8 +1688,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, ev_offset, current_checksum_alg); if (err) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace GTID event with backwards-compatible event: " "currupt event."; + } if (!need_dummy) return NULL; } @@ -1673,8 +1719,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, binlog positions. */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace binlog checkpoint or gtid list event with " "dummy: too small event."; + } } } @@ -1698,20 +1747,32 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, pos= my_b_tell(log); if (RUN_HOOK(binlog_transmit, before_send_event, (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; return "run 'before_send_event' hook failed"; + } if (my_net_write(net, (uchar*) packet->ptr(), len)) + { + my_errno= ER_UNKNOWN_ERROR; return "Failed on my_net_write()"; + } DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); if (event_type == LOAD_EVENT) { if (send_file(thd)) + { + my_errno= ER_UNKNOWN_ERROR; return "failed in send_file()"; + } } if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + my_errno= ER_UNKNOWN_ERROR; return "Failed to run hook 'after_send_event'"; + } return NULL; /* Success */ } @@ -1747,6 +1808,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT; enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE; rpl_binlog_state until_binlog_state; + bool slave_gtid_strict_mode; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; @@ -1778,9 +1840,12 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, connect_gtid_state.length(0); using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;); - if (using_gtid_state && - get_slave_until_gtid(thd, &slave_until_gtid_str)) - until_gtid_state= &until_gtid_state_obj; + if (using_gtid_state) + { + slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); + if(get_slave_until_gtid(thd, &slave_until_gtid_str)) + until_gtid_state= &until_gtid_state_obj; + } /* We want to corrupt the first event, in Log_event::read_log_event(). @@ -2134,10 +2199,10 @@ impossible position"; current_checksum_alg, using_gtid_state, >id_state, >id_skip_group, until_gtid_state, >id_until_group, - &until_binlog_state))) + &until_binlog_state, + slave_gtid_strict_mode, &error_gtid))) { errmsg= tmp_msg; - my_errno= ER_UNKNOWN_ERROR; goto err; } if (until_gtid_state && @@ -2315,10 +2380,10 @@ impossible position"; current_checksum_alg, using_gtid_state, >id_state, >id_skip_group, until_gtid_state, - >id_until_group, &until_binlog_state))) + >id_until_group, &until_binlog_state, + slave_gtid_strict_mode, &error_gtid))) { errmsg= tmp_msg; - my_errno= ER_UNKNOWN_ERROR; goto err; } if ( @@ -2430,6 +2495,17 @@ err: /* Use this error code so slave will know not to try reconnect. */ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; } + else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE) + { + my_snprintf(error_text, sizeof(error_text), + "The binlog on the master is missing the GTID %u-%u-%llu " + "requested by the slave (even though both a prior and a " + "subsequent sequence number does exist), and GTID strict mode " + "is enabled", + error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + /* Use this error code so slave will know not to try reconnect. */ + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE) { my_snprintf(error_text, sizeof(error_text), @@ -3721,8 +3797,6 @@ rpl_append_gtid_state(String *dest, bool use_binlog) bool rpl_gtid_pos_check(THD *thd, char *str, size_t len) { - /* ToDo: Use gtid_strict_mode sysvar, when implemented. */ - static const bool gtid_strict_mode= false; slave_connection_state tmp_slave_state; bool gave_conflict_warning= false, gave_missing_warning= false; @@ -3759,7 +3833,7 @@ rpl_gtid_pos_check(THD *thd, char *str, size_t len) continue; if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id))) { - if (gtid_strict_mode) + if (opt_gtid_strict_mode) { my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0), binlog_gtid->domain_id, binlog_gtid->domain_id, @@ -3778,7 +3852,7 @@ rpl_gtid_pos_check(THD *thd, char *str, size_t len) } else if (slave_gtid->seq_no < binlog_gtid->seq_no) { - if (gtid_strict_mode) + if (opt_gtid_strict_mode) { my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0), slave_gtid->domain_id, slave_gtid->server_id, diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 9edbe789196..f520c43e10c 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1215,6 +1215,26 @@ static Sys_var_uint Sys_gtid_domain_id( BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_has_super)); + +static bool check_gtid_seq_no(sys_var *self, THD *thd, set_var *var) +{ + uint32 domain_id, server_id; + uint64_t seq_no; + + if (check_has_super(self, thd, var)) + return true; + domain_id= thd->variables.gtid_domain_id; + server_id= thd->variables.server_id; + seq_no= (uint64)var->value->val_uint(); + DBUG_EXECUTE_IF("ignore_set_gtid_seq_no_check", return 0;); + if (opt_gtid_strict_mode && opt_bin_log && + mysql_bin_log.check_strict_gtid_sequence(domain_id, server_id, seq_no)) + return true; + + return false; +} + + static Sys_var_ulonglong Sys_gtid_seq_no( "gtid_seq_no", "Internal server usage, for replication with global transaction id. " @@ -1224,7 +1244,7 @@ static Sys_var_ulonglong Sys_gtid_seq_no( SESSION_ONLY(gtid_seq_no), NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG, - ON_CHECK(check_has_super)); + ON_CHECK(check_gtid_seq_no)); #ifdef HAVE_REPLICATION @@ -1364,6 +1384,15 @@ static Sys_var_gtid_slave_pos Sys_gtid_slave_pos( "The list of global transaction IDs that were last replicated on the " "server, one for each replication domain.", GLOBAL_VAR(opt_gtid_slave_pos_dummy), NO_CMD_LINE); + + +static Sys_var_mybool Sys_gtid_strict_mode( + "gtid_strict_mode", + "Enforce strict seq_no ordering of events in the binary log. Slave " + "stops with an error if it encounters an event that would cause it to " + "generate an out-of-order binlog if executed.", + GLOBAL_VAR(opt_gtid_strict_mode), + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); #endif |