diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2017-03-09 13:27:27 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2017-04-21 08:00:06 +0200 |
commit | c995ecbe9834bae31912e00cc98f7c872b63e1fb (patch) | |
tree | 9009a8beff3fabb6b3a67ce4de318d4eacd7cfd8 | |
parent | 087cf0232864b60ce62550598f5903b766fe6c90 (diff) | |
download | mariadb-git-c995ecbe9834bae31912e00cc98f7c872b63e1fb.tar.gz |
MDEV-12179: Per-engine mysql.gtid_slave_pos table
Intermediate commit.
For each GTID recorded in mysq.gtid_slave_pos, keep track of which
engine the update was made in.
This will be later used to know which rows can be deleted in the table
of a given engine.
-rw-r--r-- | sql/log_event.cc | 18 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 26 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 18 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 18 |
4 files changed, 52 insertions, 28 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 3aea3eaf2f1..d50bd5becdb 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -5027,6 +5027,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, int expected_error,actual_error= 0; Schema_specification_st db_options; uint64 sub_id= 0; + void *hton= NULL; rpl_gtid gtid; Relay_log_info const *rli= rgi->rli; Rpl_filter *rpl_filter= rli->mi->rpl_filter; @@ -5197,7 +5198,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, gtid= rgi->current_gtid; if (rpl_global_gtid_slave_state->record_gtid(thd, >id, sub_id, - true, false)) + true, false, &hton)) { int errcode= thd->get_stmt_da()->sql_errno(); if (!is_parallel_retry_error(rgi, errcode)) @@ -5417,7 +5418,7 @@ compare_errors: end: if (sub_id && !thd->is_slave_error) - rpl_global_gtid_slave_state->update_state_hash(sub_id, >id, rgi); + rpl_global_gtid_slave_state->update_state_hash(sub_id, >id, hton, rgi); /* Probably we have set thd->query, thd->db, thd->catalog to point to places @@ -7899,15 +7900,17 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) int ret; if (gl_flags & FLAG_IGN_GTIDS) { + void *hton= NULL; uint32 i; + for (i= 0; i < count; ++i) { if ((ret= rpl_global_gtid_slave_state->record_gtid(thd, &list[i], - sub_id_list[i], - false, false))) + sub_id_list[i], + false, false, &hton))) return ret; rpl_global_gtid_slave_state->update_state_hash(sub_id_list[i], &list[i], - NULL); + hton, NULL); } } ret= Log_event::do_apply_event(rgi); @@ -8388,6 +8391,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) rpl_gtid gtid; uint64 sub_id= 0; Relay_log_info const *rli= rgi->rli; + void *hton= NULL; /* XID_EVENT works like a COMMIT statement. And it also updates the @@ -8408,7 +8412,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) gtid= rgi->current_gtid; err= rpl_global_gtid_slave_state->record_gtid(thd, >id, sub_id, true, - false); + false, &hton); if (err) { int ec= thd->get_stmt_da()->sql_errno(); @@ -8441,7 +8445,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) thd->mdl_context.release_transactional_locks(); if (!res && sub_id) - rpl_global_gtid_slave_state->update_state_hash(sub_id, >id, rgi); + rpl_global_gtid_slave_state->update_state_hash(sub_id, >id, hton, rgi); /* Increment the global status commit count variable diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 4daa8b9a53b..2d12aafdd02 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -33,7 +33,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name= void -rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, +rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton, rpl_group_info *rgi) { int err; @@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, it is even committed. */ mysql_mutex_lock(&LOCK_slave_state); - err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi); mysql_mutex_unlock(&LOCK_slave_state); if (err) { @@ -74,12 +74,14 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) if (rgi->gtid_pending) { uint64 sub_id= rgi->gtid_sub_id; + void *hton= NULL; + rgi->gtid_pending= false; if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) { - if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton)) DBUG_RETURN(1); - update_state_hash(sub_id, &rgi->current_gtid, rgi); + update_state_hash(sub_id, &rgi->current_gtid, hton, rgi); } rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; } @@ -287,11 +289,12 @@ rpl_slave_state::truncate_hash() int rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no, rpl_group_info *rgi) + uint64 seq_no, void *hton, rpl_group_info *rgi) { element *elem= NULL; list_element *list_elem= NULL; + DBUG_ASSERT(hton); if (!(elem= get_element(domain_id))) return 1; @@ -336,6 +339,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, list_elem->server_id= server_id; list_elem->sub_id= sub_id; list_elem->seq_no= seq_no; + list_elem->hton= hton; elem->add(list_elem); if (last_sub_id < sub_id) @@ -482,7 +486,8 @@ gtid_check_rpl_slave_state_table(TABLE *table) */ int rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - bool in_transaction, bool in_statement) + bool in_transaction, bool in_statement, + void **out_hton) { TABLE_LIST tlist; int err= 0; @@ -495,6 +500,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, wait_for_commit* suspended_wfc; DBUG_ENTER("record_gtid"); + *out_hton= NULL; if (unlikely(!loaded)) { /* @@ -582,6 +588,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, table->file->print_error(err, MYF(0)); goto end; } + *out_hton= table->s->db_type(); if(opt_bin_log && (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id, @@ -1078,11 +1085,12 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, { rpl_gtid gtid; uint64 sub_id; + void *hton= NULL; if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_sub_id(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, false, in_statement) || - update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL)) + record_gtid(thd, >id, sub_id, false, in_statement, &hton) || + update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL)) return 1; if (state_from_master == end) break; @@ -1144,7 +1152,7 @@ rpl_slave_state::set_gtid_pos_tables_list(struct rpl_slave_state::gtid_pos_table struct rpl_slave_state::gtid_pos_table * -rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton) +rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton) { struct gtid_pos_table *p; char *allocated_str; diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index c8af2c4946a..ea278427061 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -112,6 +112,7 @@ struct rpl_slave_state uint64 sub_id; uint64 seq_no; uint32 server_id; + void *hton; }; /* Elements in the HASH that hold the state for one domain_id. */ @@ -158,7 +159,13 @@ struct rpl_slave_state /* Descriptor for mysql.gtid_slave_posXXX table in specific engine. */ struct gtid_pos_table { struct gtid_pos_table *next; - handlerton *table_hton; + /* + Use a void * here, rather than handlerton *, to make explicit that we + are not using the value to access any functionality in the engine. It + is just used as an opaque value to identify which engine we are using + for each GTID row. + */ + void *table_hton; LEX_STRING table_name; }; @@ -179,10 +186,10 @@ struct rpl_slave_state void truncate_hash(); ulong count() const { return hash.records; } int update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no, rpl_group_info *rgi); + uint64 seq_no, void *hton, rpl_group_info *rgi); int truncate_state_table(THD *thd); int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - bool in_transaction, bool in_statement); + bool in_transaction, bool in_statement, void **out_hton); uint64 next_sub_id(uint32 domain_id); int iterate(int (*cb)(rpl_gtid *, void *), void *data, rpl_gtid *extra_gtids, uint32 num_extra, @@ -196,12 +203,13 @@ struct rpl_slave_state element *get_element(uint32 domain_id); int put_back_list(uint32 domain_id, list_element *list); - void update_state_hash(uint64 sub_id, rpl_gtid *gtid, rpl_group_info *rgi); + void update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton, + rpl_group_info *rgi); int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi); int check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi); void release_domain_owner(rpl_group_info *rgi); void set_gtid_pos_tables_list(struct gtid_pos_table *new_list); - struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, handlerton *hton); + struct gtid_pos_table *alloc_gtid_pos_table(LEX_STRING *table_name, void *hton); void free_gtid_pos_tables(struct gtid_pos_table *list); }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 1be1e663d86..74d831db6dc 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1468,11 +1468,11 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; }; +struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; }; static int scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array, - LEX_STRING *tablename, handlerton **out_hton) + LEX_STRING *tablename, void **out_hton) { TABLE_LIST tlist; TABLE *table; @@ -1529,6 +1529,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array, tmp_entry.gtid.domain_id= domain_id; tmp_entry.gtid.server_id= server_id; tmp_entry.gtid.seq_no= seq_no; + tmp_entry.hton= table->s->db_type(); if ((err= insert_dynamic(array, (uchar *)&tmp_entry))) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); @@ -1544,6 +1545,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array, DBUG_ASSERT(entry->gtid.domain_id == domain_id); entry->gtid.server_id= server_id; entry->gtid.seq_no= seq_no; + entry->hton= table->s->db_type(); } else { @@ -1558,6 +1560,7 @@ scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array, entry->gtid.domain_id= domain_id; entry->gtid.server_id= server_id; entry->gtid.seq_no= seq_no; + entry->hton= table->s->db_type(); if ((err= my_hash_insert(hash, (uchar *)entry))) { my_free(entry); @@ -1652,7 +1655,7 @@ load_gtid_state_cb(THD *thd, LEX_STRING *table_name, void *arg) int err; load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); struct rpl_slave_state::gtid_pos_table *p; - handlerton *hton; + void *hton; if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array, table_name, &hton))) @@ -1707,10 +1710,11 @@ rpl_load_gtid_slave_state(THD *thd) { get_dynamic(&array, (uchar *)&tmp_entry, i); if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id, - tmp_entry.gtid.server_id, - tmp_entry.sub_id, - tmp_entry.gtid.seq_no, - NULL))) + tmp_entry.gtid.server_id, + tmp_entry.sub_id, + tmp_entry.gtid.seq_no, + tmp_entry.hton, + NULL))) { mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); my_error(ER_OUT_OF_RESOURCES, MYF(0)); |