diff options
author | unknown <knielsen@knielsen-hq.org> | 2012-11-05 15:01:49 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2012-11-05 15:01:49 +0100 |
commit | 03f28863e845976f899c8c35dab3add918f4a8f6 (patch) | |
tree | 47d17f08d260da4dcf982110415f1b63b1f311a4 /sql | |
parent | ab8e8f4b277c23e6989650e0590ec0a5fa03fb3a (diff) | |
download | mariadb-git-03f28863e845976f899c8c35dab3add918f4a8f6.tar.gz |
MDEV-26: Global transaction commit. Intermediate commit.
Now slave records GTID in mysql.rpl_slave_state when applying XID log event.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log.cc | 4 | ||||
-rw-r--r-- | sql/log_event.cc | 312 | ||||
-rw-r--r-- | sql/log_event.h | 86 | ||||
-rw-r--r-- | sql/mysqld.cc | 10 | ||||
-rw-r--r-- | sql/mysqld.h | 1 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 12 | ||||
-rw-r--r-- | sql/rpl_rli.h | 12 | ||||
-rw-r--r-- | sql/slave.cc | 30 | ||||
-rw-r--r-- | sql/sql_repl.cc | 118 | ||||
-rw-r--r-- | sql/sql_repl.h | 5 |
10 files changed, 564 insertions, 26 deletions
diff --git a/sql/log.cc b/sql/log.cc index 683f4c6ce01..acabcd82cce 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -120,6 +120,8 @@ static MYSQL_BIN_LOG::xid_count_per_binlog * static bool start_binlog_background_thread(); +rpl_binlog_state rpl_global_gtid_binlog_state; + /** purge logs, master and slave sides both, related error code convertor. @@ -5334,7 +5336,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, /* Update the replication state (last GTID in each replication domain). */ mysql_mutex_lock(&LOCK_rpl_gtid_state); - global_rpl_gtid_state.update(>id); + rpl_global_gtid_binlog_state.update(>id); mysql_mutex_unlock(&LOCK_rpl_gtid_state); return false; } diff --git a/sql/log_event.cc b/sql/log_event.cc index fbbd6309f48..fd03c18f107 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6055,28 +6055,247 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file) Global transaction ID stuff **************************************************************************/ -/** - Current replication state (hash of last GTID executed, per replication - domain). +rpl_slave_state::rpl_slave_state() + : inited(false), loaded(false) +{ + my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); +} + + +rpl_slave_state::~rpl_slave_state() +{ +} + +#ifdef MYSQL_SERVER +void +rpl_slave_state::init() +{ + DBUG_ASSERT(!inited); + mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); + inited= true; +} + +void +rpl_slave_state::deinit() +{ + uint32 i; + + if (!inited) + return; + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + list_element *l= e->list; + list_element *next; + while (l) + { + next= l->next; + my_free(l); + l= next; + } + /* The element itself is freed by my_hash_free(). */ + } + my_hash_free(&hash); + mysql_mutex_destroy(&LOCK_slave_state); +} +#endif + + +int +rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, + uint64 seq_no) +{ + element *elem= NULL; + list_element *list_elem= NULL; + + if (!(elem= get_element(domain_id))) + return 1; + + if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) + return 1; + list_elem->server_id= server_id; + list_elem->sub_id= sub_id; + list_elem->seq_no= seq_no; + + elem->add(list_elem); + return 0; +} + + +struct rpl_slave_state::element * +rpl_slave_state::get_element(uint32 domain_id) +{ + struct element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); + if (elem) + return elem; + + if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)))) + return NULL; + elem->list= NULL; + elem->last_sub_id= 0; + elem->domain_id= domain_id; + if (my_hash_insert(&hash, (uchar *)elem)) + { + my_free(elem); + return NULL; + } + return elem; +} + + +#ifdef MYSQL_SERVER +#ifdef HAVE_REPLICATION +/* + Write a gtid to the replication slave state table. + + Do it as part of the transaction, to get slave crash safety, or as a separate + transaction if !in_transaction (eg. MyISAM or DDL). + + gtid The global transaction id for this event group. + sub_id Value allocated within the sub_id when the event group was + read (sub_id must be consistent with commit order in master binlog). + + Note that caller must later ensure that the new gtid and sub_id is inserted + into the appropriate HASH element with rpl_slave_state.add(), so that it can + be deleted later. But this must only be done after COMMIT if in transaction. */ -rpl_state global_rpl_gtid_state; +int +rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, + bool in_transaction) +{ + TABLE_LIST tlist; + int err= 0; + bool table_opened= false; + TABLE *table; + list_element *elist= 0, *next; + element *elem; + DBUG_ASSERT(in_transaction /* ToDo: new transaction for DDL etc. */); -rpl_state::rpl_state() + mysql_reset_thd_for_next_command(thd, 0); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_WRITE); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + /* + ToDo: Check the table definition, error if not as expected. + We need the correct first 4 columns with correct type, and the primary key. + */ + bitmap_set_bit(table->write_set, table->field[0]->field_index); + bitmap_set_bit(table->write_set, table->field[1]->field_index); + bitmap_set_bit(table->write_set, table->field[2]->field_index); + bitmap_set_bit(table->write_set, table->field[3]->field_index); + + table->field[0]->store((ulonglong)gtid->domain_id, true); + table->field[1]->store(sub_id, true); + table->field[2]->store((ulonglong)gtid->server_id, true); + table->field[3]->store(gtid->seq_no, true); + if ((err= table->file->ha_write_row(table->record[0]))) + goto end; + + lock(); + if ((elem= get_element(gtid->domain_id)) == NULL) + { + unlock(); + err= 1; + goto end; + } + elist= elem->grab_list(); + unlock(); + + if (!elist) + goto end; + + /* Now delete any already committed rows. */ + DBUG_ASSERT + ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + table->s->primary_key < MAX_KEY /* ToDo support all storage engines */); + + bitmap_set_bit(table->read_set, table->field[0]->field_index); + bitmap_set_bit(table->read_set, table->field[1]->field_index); + while (elist) + { + next= elist->next; + + table->field[1]->store(elist->sub_id, true); + /* domain_id is already set in table->record[0] from write_row() above. */ + if ((err= table->file->ha_rnd_pos_by_record(table->record[0])) || + (err= table->file->ha_delete_row(table->record[0]))) + goto end; + my_free(elist); + elist= next; + } + +end: + + if (table_opened) + { + if (err) + { + /* + ToDo: If error, we need to put any remaining elist back into the HASH so + we can do another delete attempt later. + */ + ha_rollback_trans(thd, FALSE); + close_thread_tables(thd); + if (in_transaction) + ha_rollback_trans(thd, TRUE); + } + else + { + ha_commit_trans(thd, FALSE); + close_thread_tables(thd); + if (in_transaction) + ha_commit_trans(thd, TRUE); + } + } + return err; +} + + +uint64 +rpl_slave_state::next_subid(uint32 domain_id) +{ + uint32 sub_id= 0; + element *elem; + + lock(); + elem= get_element(domain_id); + if (elem) + sub_id= ++elem->last_sub_id; + unlock(); + + return sub_id; +} +#endif + + +rpl_binlog_state::rpl_binlog_state() { my_hash_init(&hash, &my_charset_bin, 32, - offsetof(rpl_gtid, domain_id), sizeof(uint32), - NULL, my_free, HASH_UNIQUE); + offsetof(rpl_gtid, domain_id), 2*sizeof(uint32), NULL, my_free, + HASH_UNIQUE); + mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, + MY_MUTEX_INIT_SLOW); } -rpl_state::~rpl_state() +rpl_binlog_state::~rpl_binlog_state() { + mysql_mutex_destroy(&LOCK_binlog_state); my_hash_free(&hash); } -#ifdef MYSQL_SERVER /* Update replication state with a new GTID. @@ -6086,7 +6305,7 @@ rpl_state::~rpl_state() Returns 0 for ok, 1 for error. */ int -rpl_state::update(const struct rpl_gtid *gtid) +rpl_binlog_state::update(const struct rpl_gtid *gtid) { uchar *rec; @@ -6206,20 +6425,20 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol) protocol->store(buf, p-buf, &my_charset_bin); } -static char gtid_begin_string[5] = {'B','E','G','I','N'}; +static char gtid_begin_string[] = "BEGIN"; int Gtid_log_event::do_apply_event(Relay_log_info const *rli) { - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); - - /* ToDo: record the new GTID. */ + thd->variables.server_id= this->server_id; + thd->variables.gtid_domain_id= this->domain_id; + thd->variables.gtid_seq_no= this->seq_no; if (flags2 & FL_STANDALONE) return 0; /* Execute this like a BEGIN query event. */ - thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string), + thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1, &my_charset_bin, next_query_id()); Parser_state parser_state; if (!parser_state.init(thd, thd->query(), thd->query_length())) @@ -6350,7 +6569,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, #ifdef MYSQL_SERVER -Gtid_list_log_event::Gtid_list_log_event(rpl_state *gtid_set) +Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) : count(gtid_set->count()), list(0) { DBUG_ASSERT(count != 0); @@ -6804,12 +7023,73 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) int Xid_log_event::do_apply_event(Relay_log_info const *rli) { bool res; + int err; + rpl_gtid gtid; + uint64 sub_id; + + /* + Record any GTID in the same transaction, so slave state is transactionally + consistent. + */ + if ((sub_id= rli->gtid_sub_id)) + { + /* Clear the GTID from the RLI so we don't accidentally reuse it. */ + const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0; + + gtid= rli->current_gtid; + err= rpl_global_gtid_slave_state.record_gtid(thd, >id, sub_id, true); + if (err) + { + trans_rollback(thd); + return err; + } + } + /* For a slave Xid_log_event is COMMIT */ general_log_print(thd, COM_QUERY, "COMMIT /* implicit, from Xid_log_event */"); res= trans_commit(thd); /* Automatically rolls back on error. */ thd->mdl_context.release_transactional_locks(); + if (sub_id) + { + /* + Add the gtid to the HASH in the replication slave state. + + We must do this only here _after_ commit, so that for parallel + replication, there will not be an attempt to delete the corresponding + table row before it is even committed. + + Even if commit fails, we still add the entry - in case the table + mysql.rpl_slave_state is non-transactional and the row is not removed + by rollback. + */ + rpl_slave_state::element *elem= + rpl_global_gtid_slave_state.get_element(gtid.domain_id); + rpl_slave_state::list_element *lelem= + (rpl_slave_state::list_element *)my_malloc(sizeof(*lelem), MYF(MY_WME)); + if (elem && lelem) + { + lelem->sub_id= sub_id; + lelem->server_id= gtid.server_id; + lelem->seq_no= gtid.seq_no; + elem->add(lelem); + } + else + { + if (lelem) + my_free(lelem); + sql_print_warning("Slave: Out of memory during slave state maintenance. " + "Some no longer necessary rows in table " + "mysql.rpl_slave_state may be left undeleted."); + } + /* + Such failure is not fatal. We will fail to delete the row for this GTID, + but it will do no harm and will be removed automatically on next server + restart. + */ + } + /* Increment the global status commit count variable */ diff --git a/sql/log_event.h b/sql/log_event.h index be809970c6d..04c818313a3 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -2953,18 +2953,92 @@ struct rpl_gtid }; -struct rpl_state +/* + Replication slave state. + + For every independent replication stream (identified by domain_id), this + remembers the last gtid applied on the slave within this domain. + + Since events are always committed in-order within a single domain, this is + sufficient to maintain the state of the replication slave. +*/ +struct rpl_slave_state { + /* Elements in the list of GTIDs kept for each domain_id. */ + struct list_element + { + struct list_element *next; + uint64 sub_id; + uint64 seq_no; + uint32 server_id; + }; + + /* Elements in the HASH that hold the state for one domain_id. */ + struct element + { + struct list_element *list; + uint64 last_sub_id; + uint32 domain_id; + + list_element *grab_list() { list_element *l= list; list= NULL; return l; } + void add (list_element *l) + { + l->next= list; + list= l; + if (last_sub_id < l->sub_id) + last_sub_id= l->sub_id; + } + }; + + /* Mapping from domain_id to its element. */ HASH hash; + /* Mutex protecting access to the state. */ + mysql_mutex_t LOCK_slave_state; + + bool inited; + bool loaded; - rpl_state(); - ~rpl_state(); + rpl_slave_state(); + ~rpl_slave_state(); + void init(); + void deinit(); ulong count() const { return hash.records; } - int update(const struct rpl_gtid *gtid); + int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no); + int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, + bool in_transaction); + uint64 next_subid(uint32 domain_id); + + void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); } + void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); } + + element *get_element(uint32 domain_id); }; -extern rpl_state global_rpl_gtid_state; + +/* + Binlog state. + This keeps the last GTID written to the binlog for every distinct + (domain_id, server_id) pair. + This will be logged at the start of the next binlog file as a + Gtid_list_log_event; this way, it is easy to find the binlog file + containing a gigen GTID, by simply scanning backwards from the newest + one until a lower seq_no is found in the Gtid_list_log_event at the + start of a binlog for the given domain_id and server_id. +*/ +struct rpl_binlog_state +{ + /* Mapping from (domain_id,server_id) to its GTID. */ + HASH hash; + /* Mutex protecting access to the state. */ + mysql_mutex_t LOCK_binlog_state; + + rpl_binlog_state(); + ~rpl_binlog_state(); + + ulong count() const { return hash.records; } + int update(const struct rpl_gtid *gtid); +}; /** @class Gtid_log_event @@ -3129,7 +3203,7 @@ public: static const uint element_size= 4+4+8; #ifdef MYSQL_SERVER - Gtid_list_log_event(rpl_state *gtid_set); + Gtid_list_log_event(rpl_binlog_state *gtid_set); #ifdef HAVE_REPLICATION void pack_info(THD *thd, Protocol *protocol); #endif diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 548230ea48d..042eb8a60bc 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -766,6 +766,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; PSI_mutex_key key_RELAYLOG_LOCK_index; +PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, @@ -838,7 +839,9 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL}, { &key_LOG_INFO_lock, "LOG_INFO::lock", 0}, { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL}, - { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0} + { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0}, + { &key_LOCK_slave_state, "key_LOCK_slave_state", 0}, + { &key_LOCK_binlog_state, "key_LOCK_binlog_state", 0} }; PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, @@ -1783,6 +1786,7 @@ static void mysqld_exit(int exit_code) but if a kill -15 signal was sent, the signal thread did spawn the kill_server_thread thread, which is running concurrently. */ + rpl_deinit_gtid_slave_state(); wait_for_signal_thread_to_end(); mysql_audit_finalize(); clean_up_mutexes(); @@ -4064,6 +4068,10 @@ static int init_thread_environment() PTHREAD_CREATE_DETACHED); pthread_attr_setscope(&connection_attrib, PTHREAD_SCOPE_SYSTEM); +#ifdef HAVE_REPLICATION + rpl_init_gtid_slave_state(); +#endif + DBUG_RETURN(0); } diff --git a/sql/mysqld.h b/sql/mysqld.h index bf4957dba69..4fcef9d3564 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -247,6 +247,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; extern PSI_mutex_key key_RELAYLOG_LOCK_index; +extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; extern PSI_mutex_key key_LOCK_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 5fd52599d9c..4cd07ba77de 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -31,6 +31,16 @@ static int count_relay_log_space(Relay_log_info* rli); +/** + Current replication state (hash of last GTID executed, per replication + domain). +*/ +rpl_slave_state rpl_global_gtid_slave_state; + +const LEX_STRING rpl_gtid_slave_state_table_name= + { STRING_WITH_LEN("rpl_slave_state") }; + + // Defined in slave.cc int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, @@ -51,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) abort_pos_wait(0), slave_run_id(0), sql_thd(0), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), - tables_to_lock(0), tables_to_lock_count(0), + gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0), last_event_start_time(0), deferred_events(NULL),m_flags(0), row_stmt_start_timestamp(0), long_find_row_note_printed(false), m_annotate_event(0) diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 6144d37026b..0bcaaa37a59 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -307,6 +307,14 @@ public: char slave_patternload_file[FN_REFLEN]; size_t slave_patternload_file_size; + /* + Current GTID being processed. + The sub_id gives the binlog order within one domain_id. A zero sub_id + means that there is no active GTID. + */ + uint64 gtid_sub_id; + rpl_gtid current_gtid; + Relay_log_info(bool is_slave_recovery); ~Relay_log_info(); @@ -584,4 +592,8 @@ private: int init_relay_log_info(Relay_log_info* rli, const char* info_fname); +extern const LEX_STRING rpl_gtid_slave_state_table_name; +extern struct rpl_slave_state rpl_global_gtid_slave_state; + + #endif /* RPL_RLI_H */ diff --git a/sql/slave.cc b/sql/slave.cc index 1c30a8a8b98..38bf6559b35 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3724,6 +3724,15 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, goto err; } + /* Load the set of seen GTIDs, if we did not already. */ + if (rpl_load_gtid_slave_state(thd)) + { + rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), + "Unable to load replication GTID slave state from mysql.%s: %s", + rpl_gtid_slave_state_table_name.str, thd->stmt_da->message()); + goto err; + } + /* execute init_slave variable */ if (opt_init_slave.length) { @@ -5189,6 +5198,27 @@ static Log_event* next_event(Relay_log_info* rli) inc_event_relay_log_pos() */ rli->future_event_relay_log_pos= my_b_tell(cur_log); + /* + For GTID, allocate a new sub_id for the given domain_id. + The sub_id must be allocated in increasing order of binlog order. + */ + if (ev->get_type_code() == GTID_EVENT) + { + Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev); + uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id); + if (!sub_id) + { + errmsg = "slave SQL thread aborted because of out-of-memory error"; + if (hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } + rli->gtid_sub_id= sub_id; + rli->current_gtid.server_id= gev->server_id; + rli->current_gtid.domain_id= gev->domain_id; + rli->current_gtid.seq_no= gev->seq_no; + } + if (hot_log) mysql_mutex_unlock(log_lock); DBUG_RETURN(ev); diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index f6329092ed0..3c206a8857f 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -16,10 +16,12 @@ #include "sql_priv.h" #include "unireg.h" +#include "sql_base.h" #include "sql_parse.h" // check_access #ifdef HAVE_REPLICATION #include "rpl_mi.h" +#include "rpl_rli.h" #include "sql_repl.h" #include "sql_acl.h" // SUPER_ACL #include "log_event.h" @@ -748,7 +750,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, mariadb_slave_capability= get_mariadb_slave_capability(thd); if (global_system_variables.log_warnings > 1) sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", - thd->variables.server_id, log_ident, (ulong)pos); + (int)thd->variables.server_id, log_ident, (ulong)pos); if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) { errmsg= "Failed to run hook 'transmit_start'"; @@ -2442,4 +2444,118 @@ int log_loaded_block(IO_CACHE* file) DBUG_RETURN(0); } + +/** + Initialise the slave replication state from the mysql.rpl_slave_state table. + + This is called each time an SQL thread starts, but the data is only actually + loaded on the first call. + + The slave state is the last GTID applied on the slave within each + replication domain. + + To avoid row lock contention, there are multiple rows for each domain_id. + The one containing the current slave state is the one with the maximal + sub_id value, within each domain_id. + + CREATE TABLE mysql.rpl_slave_state ( + domain_id INT UNSIGNED NOT NULL, + sub_id BIGINT UNSIGNED NOT NULL, + server_id INT UNSIGNED NOT NULL, + seq_no BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (domain_id, sub_id)) +*/ + +void +rpl_init_gtid_slave_state() +{ + rpl_global_gtid_slave_state.init(); +} + + +void +rpl_deinit_gtid_slave_state() +{ + rpl_global_gtid_slave_state.deinit(); +} + + +int +rpl_load_gtid_slave_state(THD *thd) +{ + TABLE_LIST tlist; + TABLE *table; + bool table_opened= false; + bool table_scanned= false; + DBUG_ENTER("rpl_load_gtid_slave_state"); + + int err= 0; + rpl_global_gtid_slave_state.lock(); + if (rpl_global_gtid_slave_state.loaded) + goto end; + + mysql_reset_thd_for_next_command(thd, 0); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_READ); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + /* + ToDo: Check the table definition, error if not as expected. + We need the correct first 4 columns with correct type, and the primary key. + */ + + bitmap_set_bit(table->read_set, table->field[0]->field_index); + bitmap_set_bit(table->read_set, table->field[1]->field_index); + bitmap_set_bit(table->read_set, table->field[2]->field_index); + bitmap_set_bit(table->read_set, table->field[3]->field_index); + if ((err= table->file->ha_rnd_init_with_error(1))) + goto end; + table_scanned= true; + for (;;) + { + uint32 domain_id, server_id; + uint64 sub_id, seq_no; + if ((err= table->file->ha_rnd_next(table->record[0]))) + { + if (err == HA_ERR_RECORD_DELETED) + continue; + else if (err == HA_ERR_END_OF_FILE) + break; + else + goto end; + } + domain_id= (ulonglong)table->field[0]->val_int(); + sub_id= (ulonglong)table->field[1]->val_int(); + server_id= (ulonglong)table->field[2]->val_int(); + seq_no= (ulonglong)table->field[3]->val_int(); + DBUG_PRINT("info", ("Read slave state row: %u:%u-%lu sub_id=%lu\n", + (unsigned)domain_id, (unsigned)server_id, + (ulong)seq_no, (ulong)sub_id)); + if ((err= rpl_global_gtid_slave_state.update(domain_id, server_id, + sub_id, seq_no))) + goto end; + } + err= 0; /* Clear HA_ERR_END_OF_FILE */ + + rpl_global_gtid_slave_state.loaded= true; + +end: + if (table_scanned) + { + table->file->ha_index_or_rnd_end(); + ha_commit_trans(thd, FALSE); + ha_commit_trans(thd, TRUE); + } + if (table_opened) + close_thread_tables(thd); + rpl_global_gtid_slave_state.unlock(); + DBUG_RETURN(err); +} + #endif /* HAVE_REPLICATION */ diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 9ca7e6b00b1..89fa0cf25be 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -65,6 +65,11 @@ int log_loaded_block(IO_CACHE* file); int init_replication_sys_vars(); void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags); +extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; +void rpl_init_gtid_slave_state(); +void rpl_deinit_gtid_slave_state(); +int rpl_load_gtid_slave_state(THD *thd); + #endif /* HAVE_REPLICATION */ #endif /* SQL_REPL_INCLUDED */ |