summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2012-11-05 15:01:49 +0100
committerunknown <knielsen@knielsen-hq.org>2012-11-05 15:01:49 +0100
commit03f28863e845976f899c8c35dab3add918f4a8f6 (patch)
tree47d17f08d260da4dcf982110415f1b63b1f311a4 /sql
parentab8e8f4b277c23e6989650e0590ec0a5fa03fb3a (diff)
downloadmariadb-git-03f28863e845976f899c8c35dab3add918f4a8f6.tar.gz
MDEV-26: Global transaction commit. Intermediate commit.
Now slave records GTID in mysql.rpl_slave_state when applying XID log event.
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc4
-rw-r--r--sql/log_event.cc312
-rw-r--r--sql/log_event.h86
-rw-r--r--sql/mysqld.cc10
-rw-r--r--sql/mysqld.h1
-rw-r--r--sql/rpl_rli.cc12
-rw-r--r--sql/rpl_rli.h12
-rw-r--r--sql/slave.cc30
-rw-r--r--sql/sql_repl.cc118
-rw-r--r--sql/sql_repl.h5
10 files changed, 564 insertions, 26 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 683f4c6ce01..acabcd82cce 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -120,6 +120,8 @@ static MYSQL_BIN_LOG::xid_count_per_binlog *
static bool start_binlog_background_thread();
+rpl_binlog_state rpl_global_gtid_binlog_state;
+
/**
purge logs, master and slave sides both, related error code
convertor.
@@ -5334,7 +5336,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
/* Update the replication state (last GTID in each replication domain). */
mysql_mutex_lock(&LOCK_rpl_gtid_state);
- global_rpl_gtid_state.update(&gtid);
+ rpl_global_gtid_binlog_state.update(&gtid);
mysql_mutex_unlock(&LOCK_rpl_gtid_state);
return false;
}
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fbbd6309f48..fd03c18f107 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6055,28 +6055,247 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
Global transaction ID stuff
**************************************************************************/
-/**
- Current replication state (hash of last GTID executed, per replication
- domain).
+rpl_slave_state::rpl_slave_state()
+ : inited(false), loaded(false)
+{
+ my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+}
+
+
+rpl_slave_state::~rpl_slave_state()
+{
+}
+
+#ifdef MYSQL_SERVER
+void
+rpl_slave_state::init()
+{
+ DBUG_ASSERT(!inited);
+ mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW);
+ inited= true;
+}
+
+void
+rpl_slave_state::deinit()
+{
+ uint32 i;
+
+ if (!inited)
+ return;
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ list_element *l= e->list;
+ list_element *next;
+ while (l)
+ {
+ next= l->next;
+ my_free(l);
+ l= next;
+ }
+ /* The element itself is freed by my_hash_free(). */
+ }
+ my_hash_free(&hash);
+ mysql_mutex_destroy(&LOCK_slave_state);
+}
+#endif
+
+
+int
+rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
+ uint64 seq_no)
+{
+ element *elem= NULL;
+ list_element *list_elem= NULL;
+
+ if (!(elem= get_element(domain_id)))
+ return 1;
+
+ if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
+ return 1;
+ list_elem->server_id= server_id;
+ list_elem->sub_id= sub_id;
+ list_elem->seq_no= seq_no;
+
+ elem->add(list_elem);
+ return 0;
+}
+
+
+struct rpl_slave_state::element *
+rpl_slave_state::get_element(uint32 domain_id)
+{
+ struct element *elem;
+
+ elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
+ if (elem)
+ return elem;
+
+ if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
+ return NULL;
+ elem->list= NULL;
+ elem->last_sub_id= 0;
+ elem->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)elem))
+ {
+ my_free(elem);
+ return NULL;
+ }
+ return elem;
+}
+
+
+#ifdef MYSQL_SERVER
+#ifdef HAVE_REPLICATION
+/*
+ Write a gtid to the replication slave state table.
+
+ Do it as part of the transaction, to get slave crash safety, or as a separate
+ transaction if !in_transaction (eg. MyISAM or DDL).
+
+ gtid The global transaction id for this event group.
+ sub_id Value allocated within the sub_id when the event group was
+ read (sub_id must be consistent with commit order in master binlog).
+
+ Note that caller must later ensure that the new gtid and sub_id is inserted
+ into the appropriate HASH element with rpl_slave_state.add(), so that it can
+ be deleted later. But this must only be done after COMMIT if in transaction.
*/
-rpl_state global_rpl_gtid_state;
+int
+rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
+ bool in_transaction)
+{
+ TABLE_LIST tlist;
+ int err= 0;
+ bool table_opened= false;
+ TABLE *table;
+ list_element *elist= 0, *next;
+ element *elem;
+ DBUG_ASSERT(in_transaction /* ToDo: new transaction for DDL etc. */);
-rpl_state::rpl_state()
+ mysql_reset_thd_for_next_command(thd, 0);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_WRITE);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ /*
+ ToDo: Check the table definition, error if not as expected.
+ We need the correct first 4 columns with correct type, and the primary key.
+ */
+ bitmap_set_bit(table->write_set, table->field[0]->field_index);
+ bitmap_set_bit(table->write_set, table->field[1]->field_index);
+ bitmap_set_bit(table->write_set, table->field[2]->field_index);
+ bitmap_set_bit(table->write_set, table->field[3]->field_index);
+
+ table->field[0]->store((ulonglong)gtid->domain_id, true);
+ table->field[1]->store(sub_id, true);
+ table->field[2]->store((ulonglong)gtid->server_id, true);
+ table->field[3]->store(gtid->seq_no, true);
+ if ((err= table->file->ha_write_row(table->record[0])))
+ goto end;
+
+ lock();
+ if ((elem= get_element(gtid->domain_id)) == NULL)
+ {
+ unlock();
+ err= 1;
+ goto end;
+ }
+ elist= elem->grab_list();
+ unlock();
+
+ if (!elist)
+ goto end;
+
+ /* Now delete any already committed rows. */
+ DBUG_ASSERT
+ ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
+ table->s->primary_key < MAX_KEY /* ToDo support all storage engines */);
+
+ bitmap_set_bit(table->read_set, table->field[0]->field_index);
+ bitmap_set_bit(table->read_set, table->field[1]->field_index);
+ while (elist)
+ {
+ next= elist->next;
+
+ table->field[1]->store(elist->sub_id, true);
+ /* domain_id is already set in table->record[0] from write_row() above. */
+ if ((err= table->file->ha_rnd_pos_by_record(table->record[0])) ||
+ (err= table->file->ha_delete_row(table->record[0])))
+ goto end;
+ my_free(elist);
+ elist= next;
+ }
+
+end:
+
+ if (table_opened)
+ {
+ if (err)
+ {
+ /*
+ ToDo: If error, we need to put any remaining elist back into the HASH so
+ we can do another delete attempt later.
+ */
+ ha_rollback_trans(thd, FALSE);
+ close_thread_tables(thd);
+ if (in_transaction)
+ ha_rollback_trans(thd, TRUE);
+ }
+ else
+ {
+ ha_commit_trans(thd, FALSE);
+ close_thread_tables(thd);
+ if (in_transaction)
+ ha_commit_trans(thd, TRUE);
+ }
+ }
+ return err;
+}
+
+
+uint64
+rpl_slave_state::next_subid(uint32 domain_id)
+{
+ uint32 sub_id= 0;
+ element *elem;
+
+ lock();
+ elem= get_element(domain_id);
+ if (elem)
+ sub_id= ++elem->last_sub_id;
+ unlock();
+
+ return sub_id;
+}
+#endif
+
+
+rpl_binlog_state::rpl_binlog_state()
{
my_hash_init(&hash, &my_charset_bin, 32,
- offsetof(rpl_gtid, domain_id), sizeof(uint32),
- NULL, my_free, HASH_UNIQUE);
+ offsetof(rpl_gtid, domain_id), 2*sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
+ MY_MUTEX_INIT_SLOW);
}
-rpl_state::~rpl_state()
+rpl_binlog_state::~rpl_binlog_state()
{
+ mysql_mutex_destroy(&LOCK_binlog_state);
my_hash_free(&hash);
}
-#ifdef MYSQL_SERVER
/*
Update replication state with a new GTID.
@@ -6086,7 +6305,7 @@ rpl_state::~rpl_state()
Returns 0 for ok, 1 for error.
*/
int
-rpl_state::update(const struct rpl_gtid *gtid)
+rpl_binlog_state::update(const struct rpl_gtid *gtid)
{
uchar *rec;
@@ -6206,20 +6425,20 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
protocol->store(buf, p-buf, &my_charset_bin);
}
-static char gtid_begin_string[5] = {'B','E','G','I','N'};
+static char gtid_begin_string[] = "BEGIN";
int
Gtid_log_event::do_apply_event(Relay_log_info const *rli)
{
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
-
- /* ToDo: record the new GTID. */
+ thd->variables.server_id= this->server_id;
+ thd->variables.gtid_domain_id= this->domain_id;
+ thd->variables.gtid_seq_no= this->seq_no;
if (flags2 & FL_STANDALONE)
return 0;
/* Execute this like a BEGIN query event. */
- thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string),
+ thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
&my_charset_bin, next_query_id());
Parser_state parser_state;
if (!parser_state.init(thd, thd->query(), thd->query_length()))
@@ -6350,7 +6569,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
#ifdef MYSQL_SERVER
-Gtid_list_log_event::Gtid_list_log_event(rpl_state *gtid_set)
+Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
: count(gtid_set->count()), list(0)
{
DBUG_ASSERT(count != 0);
@@ -6804,12 +7023,73 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
int Xid_log_event::do_apply_event(Relay_log_info const *rli)
{
bool res;
+ int err;
+ rpl_gtid gtid;
+ uint64 sub_id;
+
+ /*
+ Record any GTID in the same transaction, so slave state is transactionally
+ consistent.
+ */
+ if ((sub_id= rli->gtid_sub_id))
+ {
+ /* Clear the GTID from the RLI so we don't accidentally reuse it. */
+ const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
+
+ gtid= rli->current_gtid;
+ err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true);
+ if (err)
+ {
+ trans_rollback(thd);
+ return err;
+ }
+ }
+
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks();
+ if (sub_id)
+ {
+ /*
+ Add the gtid to the HASH in the replication slave state.
+
+ We must do this only here _after_ commit, so that for parallel
+ replication, there will not be an attempt to delete the corresponding
+ table row before it is even committed.
+
+ Even if commit fails, we still add the entry - in case the table
+ mysql.rpl_slave_state is non-transactional and the row is not removed
+ by rollback.
+ */
+ rpl_slave_state::element *elem=
+ rpl_global_gtid_slave_state.get_element(gtid.domain_id);
+ rpl_slave_state::list_element *lelem=
+ (rpl_slave_state::list_element *)my_malloc(sizeof(*lelem), MYF(MY_WME));
+ if (elem && lelem)
+ {
+ lelem->sub_id= sub_id;
+ lelem->server_id= gtid.server_id;
+ lelem->seq_no= gtid.seq_no;
+ elem->add(lelem);
+ }
+ else
+ {
+ if (lelem)
+ my_free(lelem);
+ sql_print_warning("Slave: Out of memory during slave state maintenance. "
+ "Some no longer necessary rows in table "
+ "mysql.rpl_slave_state may be left undeleted.");
+ }
+ /*
+ Such failure is not fatal. We will fail to delete the row for this GTID,
+ but it will do no harm and will be removed automatically on next server
+ restart.
+ */
+ }
+
/*
Increment the global status commit count variable
*/
diff --git a/sql/log_event.h b/sql/log_event.h
index be809970c6d..04c818313a3 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -2953,18 +2953,92 @@ struct rpl_gtid
};
-struct rpl_state
+/*
+ Replication slave state.
+
+ For every independent replication stream (identified by domain_id), this
+ remembers the last gtid applied on the slave within this domain.
+
+ Since events are always committed in-order within a single domain, this is
+ sufficient to maintain the state of the replication slave.
+*/
+struct rpl_slave_state
{
+ /* Elements in the list of GTIDs kept for each domain_id. */
+ struct list_element
+ {
+ struct list_element *next;
+ uint64 sub_id;
+ uint64 seq_no;
+ uint32 server_id;
+ };
+
+ /* Elements in the HASH that hold the state for one domain_id. */
+ struct element
+ {
+ struct list_element *list;
+ uint64 last_sub_id;
+ uint32 domain_id;
+
+ list_element *grab_list() { list_element *l= list; list= NULL; return l; }
+ void add (list_element *l)
+ {
+ l->next= list;
+ list= l;
+ if (last_sub_id < l->sub_id)
+ last_sub_id= l->sub_id;
+ }
+ };
+
+ /* Mapping from domain_id to its element. */
HASH hash;
+ /* Mutex protecting access to the state. */
+ mysql_mutex_t LOCK_slave_state;
+
+ bool inited;
+ bool loaded;
- rpl_state();
- ~rpl_state();
+ rpl_slave_state();
+ ~rpl_slave_state();
+ void init();
+ void deinit();
ulong count() const { return hash.records; }
- int update(const struct rpl_gtid *gtid);
+ int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
+ int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
+ bool in_transaction);
+ uint64 next_subid(uint32 domain_id);
+
+ void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
+ void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
+
+ element *get_element(uint32 domain_id);
};
-extern rpl_state global_rpl_gtid_state;
+
+/*
+ Binlog state.
+ This keeps the last GTID written to the binlog for every distinct
+ (domain_id, server_id) pair.
+ This will be logged at the start of the next binlog file as a
+ Gtid_list_log_event; this way, it is easy to find the binlog file
+ containing a gigen GTID, by simply scanning backwards from the newest
+ one until a lower seq_no is found in the Gtid_list_log_event at the
+ start of a binlog for the given domain_id and server_id.
+*/
+struct rpl_binlog_state
+{
+ /* Mapping from (domain_id,server_id) to its GTID. */
+ HASH hash;
+ /* Mutex protecting access to the state. */
+ mysql_mutex_t LOCK_binlog_state;
+
+ rpl_binlog_state();
+ ~rpl_binlog_state();
+
+ ulong count() const { return hash.records; }
+ int update(const struct rpl_gtid *gtid);
+};
/**
@class Gtid_log_event
@@ -3129,7 +3203,7 @@ public:
static const uint element_size= 4+4+8;
#ifdef MYSQL_SERVER
- Gtid_list_log_event(rpl_state *gtid_set);
+ Gtid_list_log_event(rpl_binlog_state *gtid_set);
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
#endif
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 548230ea48d..042eb8a60bc 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -766,6 +766,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
+PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
@@ -838,7 +839,9 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
- { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0}
+ { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
+ { &key_LOCK_slave_state, "key_LOCK_slave_state", 0},
+ { &key_LOCK_binlog_state, "key_LOCK_binlog_state", 0}
};
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
@@ -1783,6 +1786,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
+ rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
mysql_audit_finalize();
clean_up_mutexes();
@@ -4064,6 +4068,10 @@ static int init_thread_environment()
PTHREAD_CREATE_DETACHED);
pthread_attr_setscope(&connection_attrib, PTHREAD_SCOPE_SYSTEM);
+#ifdef HAVE_REPLICATION
+ rpl_init_gtid_slave_state();
+#endif
+
DBUG_RETURN(0);
}
diff --git a/sql/mysqld.h b/sql/mysqld.h
index bf4957dba69..4fcef9d3564 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -247,6 +247,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
+extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 5fd52599d9c..4cd07ba77de 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -31,6 +31,16 @@
static int count_relay_log_space(Relay_log_info* rli);
+/**
+ Current replication state (hash of last GTID executed, per replication
+ domain).
+*/
+rpl_slave_state rpl_global_gtid_slave_state;
+
+const LEX_STRING rpl_gtid_slave_state_table_name=
+ { STRING_WITH_LEN("rpl_slave_state") };
+
+
// Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
@@ -51,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_thd(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
- tables_to_lock(0), tables_to_lock_count(0),
+ gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), deferred_events(NULL),m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false),
m_annotate_event(0)
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 6144d37026b..0bcaaa37a59 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -307,6 +307,14 @@ public:
char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size;
+ /*
+ Current GTID being processed.
+ The sub_id gives the binlog order within one domain_id. A zero sub_id
+ means that there is no active GTID.
+ */
+ uint64 gtid_sub_id;
+ rpl_gtid current_gtid;
+
Relay_log_info(bool is_slave_recovery);
~Relay_log_info();
@@ -584,4 +592,8 @@ private:
int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
+extern const LEX_STRING rpl_gtid_slave_state_table_name;
+extern struct rpl_slave_state rpl_global_gtid_slave_state;
+
+
#endif /* RPL_RLI_H */
diff --git a/sql/slave.cc b/sql/slave.cc
index 1c30a8a8b98..38bf6559b35 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3724,6 +3724,15 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
goto err;
}
+ /* Load the set of seen GTIDs, if we did not already. */
+ if (rpl_load_gtid_slave_state(thd))
+ {
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
+ "Unable to load replication GTID slave state from mysql.%s: %s",
+ rpl_gtid_slave_state_table_name.str, thd->stmt_da->message());
+ goto err;
+ }
+
/* execute init_slave variable */
if (opt_init_slave.length)
{
@@ -5189,6 +5198,27 @@ static Log_event* next_event(Relay_log_info* rli)
inc_event_relay_log_pos()
*/
rli->future_event_relay_log_pos= my_b_tell(cur_log);
+ /*
+ For GTID, allocate a new sub_id for the given domain_id.
+ The sub_id must be allocated in increasing order of binlog order.
+ */
+ if (ev->get_type_code() == GTID_EVENT)
+ {
+ Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
+ uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
+ if (!sub_id)
+ {
+ errmsg = "slave SQL thread aborted because of out-of-memory error";
+ if (hot_log)
+ mysql_mutex_unlock(log_lock);
+ goto err;
+ }
+ rli->gtid_sub_id= sub_id;
+ rli->current_gtid.server_id= gev->server_id;
+ rli->current_gtid.domain_id= gev->domain_id;
+ rli->current_gtid.seq_no= gev->seq_no;
+ }
+
if (hot_log)
mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev);
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index f6329092ed0..3c206a8857f 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -16,10 +16,12 @@
#include "sql_priv.h"
#include "unireg.h"
+#include "sql_base.h"
#include "sql_parse.h" // check_access
#ifdef HAVE_REPLICATION
#include "rpl_mi.h"
+#include "rpl_rli.h"
#include "sql_repl.h"
#include "sql_acl.h" // SUPER_ACL
#include "log_event.h"
@@ -748,7 +750,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
mariadb_slave_capability= get_mariadb_slave_capability(thd);
if (global_system_variables.log_warnings > 1)
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
- thd->variables.server_id, log_ident, (ulong)pos);
+ (int)thd->variables.server_id, log_ident, (ulong)pos);
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
{
errmsg= "Failed to run hook 'transmit_start'";
@@ -2442,4 +2444,118 @@ int log_loaded_block(IO_CACHE* file)
DBUG_RETURN(0);
}
+
+/**
+ Initialise the slave replication state from the mysql.rpl_slave_state table.
+
+ This is called each time an SQL thread starts, but the data is only actually
+ loaded on the first call.
+
+ The slave state is the last GTID applied on the slave within each
+ replication domain.
+
+ To avoid row lock contention, there are multiple rows for each domain_id.
+ The one containing the current slave state is the one with the maximal
+ sub_id value, within each domain_id.
+
+ CREATE TABLE mysql.rpl_slave_state (
+ domain_id INT UNSIGNED NOT NULL,
+ sub_id BIGINT UNSIGNED NOT NULL,
+ server_id INT UNSIGNED NOT NULL,
+ seq_no BIGINT UNSIGNED NOT NULL,
+ PRIMARY KEY (domain_id, sub_id))
+*/
+
+void
+rpl_init_gtid_slave_state()
+{
+ rpl_global_gtid_slave_state.init();
+}
+
+
+void
+rpl_deinit_gtid_slave_state()
+{
+ rpl_global_gtid_slave_state.deinit();
+}
+
+
+int
+rpl_load_gtid_slave_state(THD *thd)
+{
+ TABLE_LIST tlist;
+ TABLE *table;
+ bool table_opened= false;
+ bool table_scanned= false;
+ DBUG_ENTER("rpl_load_gtid_slave_state");
+
+ int err= 0;
+ rpl_global_gtid_slave_state.lock();
+ if (rpl_global_gtid_slave_state.loaded)
+ goto end;
+
+ mysql_reset_thd_for_next_command(thd, 0);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_READ);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ /*
+ ToDo: Check the table definition, error if not as expected.
+ We need the correct first 4 columns with correct type, and the primary key.
+ */
+
+ bitmap_set_bit(table->read_set, table->field[0]->field_index);
+ bitmap_set_bit(table->read_set, table->field[1]->field_index);
+ bitmap_set_bit(table->read_set, table->field[2]->field_index);
+ bitmap_set_bit(table->read_set, table->field[3]->field_index);
+ if ((err= table->file->ha_rnd_init_with_error(1)))
+ goto end;
+ table_scanned= true;
+ for (;;)
+ {
+ uint32 domain_id, server_id;
+ uint64 sub_id, seq_no;
+ if ((err= table->file->ha_rnd_next(table->record[0])))
+ {
+ if (err == HA_ERR_RECORD_DELETED)
+ continue;
+ else if (err == HA_ERR_END_OF_FILE)
+ break;
+ else
+ goto end;
+ }
+ domain_id= (ulonglong)table->field[0]->val_int();
+ sub_id= (ulonglong)table->field[1]->val_int();
+ server_id= (ulonglong)table->field[2]->val_int();
+ seq_no= (ulonglong)table->field[3]->val_int();
+ DBUG_PRINT("info", ("Read slave state row: %u:%u-%lu sub_id=%lu\n",
+ (unsigned)domain_id, (unsigned)server_id,
+ (ulong)seq_no, (ulong)sub_id));
+ if ((err= rpl_global_gtid_slave_state.update(domain_id, server_id,
+ sub_id, seq_no)))
+ goto end;
+ }
+ err= 0; /* Clear HA_ERR_END_OF_FILE */
+
+ rpl_global_gtid_slave_state.loaded= true;
+
+end:
+ if (table_scanned)
+ {
+ table->file->ha_index_or_rnd_end();
+ ha_commit_trans(thd, FALSE);
+ ha_commit_trans(thd, TRUE);
+ }
+ if (table_opened)
+ close_thread_tables(thd);
+ rpl_global_gtid_slave_state.unlock();
+ DBUG_RETURN(err);
+}
+
#endif /* HAVE_REPLICATION */
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 9ca7e6b00b1..89fa0cf25be 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -65,6 +65,11 @@ int log_loaded_block(IO_CACHE* file);
int init_replication_sys_vars();
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
+extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
+void rpl_init_gtid_slave_state();
+void rpl_deinit_gtid_slave_state();
+int rpl_load_gtid_slave_state(THD *thd);
+
#endif /* HAVE_REPLICATION */
#endif /* SQL_REPL_INCLUDED */