summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2017-03-09 13:27:27 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2017-04-21 08:00:06 +0200
commitc995ecbe9834bae31912e00cc98f7c872b63e1fb (patch)
tree9009a8beff3fabb6b3a67ce4de318d4eacd7cfd8
parent087cf0232864b60ce62550598f5903b766fe6c90 (diff)
downloadmariadb-git-c995ecbe9834bae31912e00cc98f7c872b63e1fb.tar.gz
MDEV-12179: Per-engine mysql.gtid_slave_pos table
Intermediate commit. For each GTID recorded in mysq.gtid_slave_pos, keep track of which engine the update was made in. This will be later used to know which rows can be deleted in the table of a given engine.
-rw-r--r--sql/log_event.cc18
-rw-r--r--sql/rpl_gtid.cc26
-rw-r--r--sql/rpl_gtid.h18
-rw-r--r--sql/rpl_rli.cc18
4 files changed, 52 insertions, 28 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 3aea3eaf2f1..d50bd5becdb 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -5027,6 +5027,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
int expected_error,actual_error= 0;
Schema_specification_st db_options;
uint64 sub_id= 0;
+ void *hton= NULL;
rpl_gtid gtid;
Relay_log_info const *rli= rgi->rli;
Rpl_filter *rpl_filter= rli->mi->rpl_filter;
@@ -5197,7 +5198,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
gtid= rgi->current_gtid;
if (rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id,
- true, false))
+ true, false, &hton))
{
int errcode= thd->get_stmt_da()->sql_errno();
if (!is_parallel_retry_error(rgi, errcode))
@@ -5417,7 +5418,7 @@ compare_errors:
end:
if (sub_id && !thd->is_slave_error)
- rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
+ rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -7899,15 +7900,17 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
int ret;
if (gl_flags & FLAG_IGN_GTIDS)
{
+ void *hton= NULL;
uint32 i;
+
for (i= 0; i < count; ++i)
{
if ((ret= rpl_global_gtid_slave_state->record_gtid(thd, &list[i],
- sub_id_list[i],
- false, false)))
+ sub_id_list[i],
+ false, false, &hton)))
return ret;
rpl_global_gtid_slave_state->update_state_hash(sub_id_list[i], &list[i],
- NULL);
+ hton, NULL);
}
}
ret= Log_event::do_apply_event(rgi);
@@ -8388,6 +8391,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
rpl_gtid gtid;
uint64 sub_id= 0;
Relay_log_info const *rli= rgi->rli;
+ void *hton= NULL;
/*
XID_EVENT works like a COMMIT statement. And it also updates the
@@ -8408,7 +8412,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
gtid= rgi->current_gtid;
err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, true,
- false);
+ false, &hton);
if (err)
{
int ec= thd->get_stmt_da()->sql_errno();
@@ -8441,7 +8445,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
thd->mdl_context.release_transactional_locks();
if (!res && sub_id)
- rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
+ rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
/*
Increment the global status commit count variable
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 4daa8b9a53b..2d12aafdd02 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -33,7 +33,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void
-rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
+rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
rpl_group_info *rgi)
{
int err;
@@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
it is even committed.
*/
mysql_mutex_lock(&LOCK_slave_state);
- err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
+ err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
@@ -74,12 +74,14 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
if (rgi->gtid_pending)
{
uint64 sub_id= rgi->gtid_sub_id;
+ void *hton= NULL;
+
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
- if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
+ if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
DBUG_RETURN(1);
- update_state_hash(sub_id, &rgi->current_gtid, rgi);
+ update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
}
@@ -287,11 +289,12 @@ rpl_slave_state::truncate_hash()
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no, rpl_group_info *rgi)
+ uint64 seq_no, void *hton, rpl_group_info *rgi)
{
element *elem= NULL;
list_element *list_elem= NULL;
+ DBUG_ASSERT(hton);
if (!(elem= get_element(domain_id)))
return 1;
@@ -336,6 +339,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
list_elem->server_id= server_id;
list_elem->sub_id= sub_id;
list_elem->seq_no= seq_no;
+ list_elem->hton= hton;
elem->add(list_elem);
if (last_sub_id < sub_id)
@@ -482,7 +486,8 @@ gtid_check_rpl_slave_state_table(TABLE *table)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction, bool in_statement)
+ bool in_transaction, bool in_statement,
+ void **out_hton)
{
TABLE_LIST tlist;
int err= 0;
@@ -495,6 +500,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
wait_for_commit* suspended_wfc;
DBUG_ENTER("record_gtid");
+ *out_hton= NULL;
if (unlikely(!loaded))
{
/*
@@ -582,6 +588,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->file->print_error(err, MYF(0));
goto end;
}
+ *out_hton= table->s->db_type();
if(opt_bin_log &&
(err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
@@ -1078,11 +1085,12 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
{
rpl_gtid gtid;
uint64 sub_id;
+ void *hton= NULL;
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
- record_gtid(thd, &gtid, sub_id, false, in_statement) ||
- update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
+ record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
+ update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
return 1;
if (state_from_master == end)
break;
@@ -1144,7 +1152,7 @@ rpl_slave_state::set_gtid_pos_tables_list(struct rpl_slave_state::gtid_pos_table
struct rpl_slave_state::gtid_pos_table *
-rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton)
+rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton)
{
struct gtid_pos_table *p;
char *allocated_str;
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index c8af2c4946a..ea278427061 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -112,6 +112,7 @@ struct rpl_slave_state
uint64 sub_id;
uint64 seq_no;
uint32 server_id;
+ void *hton;
};
/* Elements in the HASH that hold the state for one domain_id. */
@@ -158,7 +159,13 @@ struct rpl_slave_state
/* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */
struct gtid_pos_table {
struct gtid_pos_table *next;
- handlerton *table_hton;
+ /*
+ Use a void * here, rather than handlerton *, to make explicit that we
+ are not using the value to access any functionality in the engine. It
+ is just used as an opaque value to identify which engine we are using
+ for each GTID row.
+ */
+ void *table_hton;
LEX_STRING table_name;
};
@@ -179,10 +186,10 @@ struct rpl_slave_state
void truncate_hash();
ulong count() const { return hash.records; }
int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no, rpl_group_info *rgi);
+ uint64 seq_no, void *hton, rpl_group_info *rgi);
int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction, bool in_statement);
+ bool in_transaction, bool in_statement, void **out_hton);
uint64 next_sub_id(uint32 domain_id);
int iterate(int (*cb)(rpl_gtid *, void *), void *data,
rpl_gtid *extra_gtids, uint32 num_extra,
@@ -196,12 +203,13 @@ struct rpl_slave_state
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
- void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi);
+ void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
+ rpl_group_info *rgi);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi);
void release_domain_owner(rpl_group_info *rgi);
void set_gtid_pos_tables_list(struct gtid_pos_table *new_list);
- struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton);
+ struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, void *hton);
void free_gtid_pos_tables(struct gtid_pos_table *list);
};
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 1be1e663d86..74d831db6dc 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1468,11 +1468,11 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; };
+struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
static int
scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
- LEX_STRING *tablename, handlerton **out_hton)
+ LEX_STRING *tablename, void **out_hton)
{
TABLE_LIST tlist;
TABLE *table;
@@ -1529,6 +1529,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
tmp_entry.gtid.domain_id= domain_id;
tmp_entry.gtid.server_id= server_id;
tmp_entry.gtid.seq_no= seq_no;
+ tmp_entry.hton= table->s->db_type();
if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -1544,6 +1545,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
DBUG_ASSERT(entry->gtid.domain_id == domain_id);
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
+ entry->hton= table->s->db_type();
}
else
{
@@ -1558,6 +1560,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
entry->gtid.domain_id= domain_id;
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
+ entry->hton= table->s->db_type();
if ((err= my_hash_insert(hash, (uchar *)entry)))
{
my_free(entry);
@@ -1652,7 +1655,7 @@ load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg)
int err;
load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
struct rpl_slave_state::gtid_pos_table *p;
- handlerton *hton;
+ void *hton;
if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
table_name, &hton)))
@@ -1707,10 +1710,11 @@ rpl_load_gtid_slave_state(THD *thd)
{
get_dynamic(&array, (uchar *)&tmp_entry, i);
if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
- tmp_entry.gtid.server_id,
- tmp_entry.sub_id,
- tmp_entry.gtid.seq_no,
- NULL)))
+ tmp_entry.gtid.server_id,
+ tmp_entry.sub_id,
+ tmp_entry.gtid.seq_no,
+ tmp_entry.hton,
+ NULL)))
{
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));