diff options
-rw-r--r-- | sql/item_create.cc | 24 | ||||
-rw-r--r-- | sql/item_strfunc.cc | 35 | ||||
-rw-r--r-- | sql/item_strfunc.h | 11 | ||||
-rw-r--r-- | sql/log_event.cc | 101 | ||||
-rw-r--r-- | sql/log_event.h | 5 | ||||
-rw-r--r-- | sql/sql_repl.cc | 181 | ||||
-rw-r--r-- | sql/sql_repl.h | 1 |
7 files changed, 355 insertions, 3 deletions
diff --git a/sql/item_create.cc b/sql/item_create.cc index fc31b074055..bfa0e36005a 100644 --- a/sql/item_create.cc +++ b/sql/item_create.cc @@ -447,6 +447,19 @@ protected: }; +class Create_func_binlog_gtid_pos : public Create_func_arg2 +{ +public: + virtual Item *create_2_arg(THD *thd, Item *arg1, Item *arg2); + + static Create_func_binlog_gtid_pos s_singleton; + +protected: + Create_func_binlog_gtid_pos() {} + virtual ~Create_func_binlog_gtid_pos() {} +}; + + class Create_func_bit_count : public Create_func_arg1 { public: @@ -3100,6 +3113,16 @@ Create_func_bin::create_1_arg(THD *thd, Item *arg1) } +Create_func_binlog_gtid_pos Create_func_binlog_gtid_pos::s_singleton; + +Item* +Create_func_binlog_gtid_pos::create_2_arg(THD *thd, Item *arg1, Item *arg2) +{ + thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); + return new (thd->mem_root) Item_func_binlog_gtid_pos(arg1, arg2); +} + + Create_func_bit_count Create_func_bit_count::s_singleton; Item* @@ -5323,6 +5346,7 @@ static Native_func_registry func_array[] = { { C_STRING_WITH_LEN("ATAN2") }, BUILDER(Create_func_atan)}, { { C_STRING_WITH_LEN("BENCHMARK") }, BUILDER(Create_func_benchmark)}, { { C_STRING_WITH_LEN("BIN") }, BUILDER(Create_func_bin)}, + { { C_STRING_WITH_LEN("BINLOG_GTID_POS") }, BUILDER(Create_func_binlog_gtid_pos)}, { { C_STRING_WITH_LEN("BIT_COUNT") }, BUILDER(Create_func_bit_count)}, { { C_STRING_WITH_LEN("BIT_LENGTH") }, BUILDER(Create_func_bit_length)}, { { C_STRING_WITH_LEN("BUFFER") }, GEOM_BUILDER(Create_func_buffer)}, diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc index 48de0c4bc5c..79ed795fe3e 100644 --- a/sql/item_strfunc.cc +++ b/sql/item_strfunc.cc @@ -59,6 +59,7 @@ C_MODE_START #include "../mysys/my_static.h" // For soundex_map C_MODE_END #include "sql_show.h" // append_identifier +#include <sql_repl.h> /** @todo Remove this. It is not safe to use a shared String object. @@ -2725,6 +2726,40 @@ err: } +void Item_func_binlog_gtid_pos::fix_length_and_dec() +{ + collation.set(system_charset_info); + max_length= MAX_BLOB_WIDTH; + maybe_null= 1; +} + + +String *Item_func_binlog_gtid_pos::val_str(String *str) +{ + DBUG_ASSERT(fixed == 1); + String name_str, *name; + longlong pos; + + if (args[0]->null_value || args[1]->null_value) + goto err; + + name= args[0]->val_str(&name_str); + pos= args[1]->val_int(); + + if (pos < 0 || pos > UINT_MAX32) + goto err; + + if (gtid_state_from_binlog_pos(name->c_ptr_safe(), (uint32)pos, str)) + goto err; + null_value= 0; + return str; + +err: + null_value= 1; + return NULL; +} + + void Item_func_rpad::fix_length_and_dec() { // Handle character set for args[0] and args[2]. diff --git a/sql/item_strfunc.h b/sql/item_strfunc.h index 486b7cf36ef..ddad7121325 100644 --- a/sql/item_strfunc.h +++ b/sql/item_strfunc.h @@ -616,6 +616,17 @@ public: }; +class Item_func_binlog_gtid_pos :public Item_str_func +{ + String tmp_value; +public: + Item_func_binlog_gtid_pos(Item *arg1,Item *arg2) :Item_str_func(arg1,arg2) {} + String *val_str(String *); + void fix_length_and_dec(); + const char *func_name() const { return "binlog_gtid_pos"; } +}; + + class Item_func_rpad :public Item_str_func { String tmp_value, rpad_str; diff --git a/sql/log_event.cc b/sql/log_event.cc index 56fff03d411..649481a721c 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6701,6 +6701,19 @@ slave_connection_state::load(char *slave_request, size_t len) } +int +slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count) +{ + uint32 i; + + my_hash_reset(&hash); + for (i= 0; i < count; ++i) + if (update(>id_list[i])) + return 1; + return 0; +} + + rpl_gtid * slave_connection_state::find(uint32 domain_id) { @@ -6708,6 +6721,30 @@ slave_connection_state::find(uint32 domain_id) } +int +slave_connection_state::update(const rpl_gtid *in_gtid) +{ + rpl_gtid *new_gtid; + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); + if (rec) + { + memcpy(rec, in_gtid, sizeof(*in_gtid)); + return 0; + } + + if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME)))) + return 1; + memcpy(new_gtid, in_gtid, sizeof(*new_gtid)); + if (my_hash_insert(&hash, (uchar *)new_gtid)) + { + my_free(new_gtid); + return 1; + } + + return 0; +} + + void slave_connection_state::remove(const rpl_gtid *in_gtid) { @@ -6725,6 +6762,30 @@ slave_connection_state::remove(const rpl_gtid *in_gtid) } +int +slave_connection_state::to_string(String *out_str) +{ + uint32 i; + + out_str->length(0); + for (i= 0; i < hash.records; ++i) + { + const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i); + if (i && out_str->append(",")) + return 1; + if (gtid->domain_id && + (out_str->append_ulonglong(gtid->domain_id) || + out_str->append("-"))) + return 1; + if (out_str->append_ulonglong(gtid->server_id) || + out_str->append("-") || + out_str->append_ulonglong(gtid->seq_no)) + return 1; + } + return 0; +} + + #endif /* MYSQL_SERVER */ @@ -7155,6 +7216,46 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) #endif /* MYSQL_SERVER */ +/* + Used to record gtid_list event while sending binlog to slave, without having to + fully contruct the event object. +*/ +bool +Gtid_list_log_event::peek(const char *event_start, uint32 event_len, + rpl_gtid **out_gtid_list, uint32 *out_list_len) +{ + const char *p; + uint32 count_field, count; + rpl_gtid *gtid_list; + + if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN) + return true; + p= event_start + LOG_EVENT_HEADER_LEN; + count_field= uint4korr(p); + p+= 4; + count= count_field & ((1<<28)-1); + if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN + + 16 * count) + return true; + if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count, MYF(MY_WME)))) + return true; + *out_gtid_list= gtid_list; + *out_list_len= count; + while (count--) + { + gtid_list->domain_id= uint4korr(p); + p+= 4; + gtid_list->server_id= uint4korr(p); + p+= 4; + gtid_list->seq_no= uint8korr(p); + p+= 8; + ++gtid_list; + } + + return false; +} + + /************************************************************************** Intvar_log_event methods **************************************************************************/ diff --git a/sql/log_event.h b/sql/log_event.h index ff7460e7ef6..8c43f0465dc 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3073,9 +3073,12 @@ struct slave_connection_state ~slave_connection_state(); int load(char *slave_request, size_t len); + int load(const rpl_gtid *gtid_list, uint32 count); rpl_gtid *find(uint32 domain_id); + int update(const rpl_gtid *in_gtid); void remove(const rpl_gtid *gtid); ulong count() const { return hash.records; } + int to_string(String *out_str); }; @@ -3261,6 +3264,8 @@ public: #ifdef MYSQL_SERVER bool write(IO_CACHE *file); #endif + static bool peek(const char *event_start, uint32 event_len, + rpl_gtid **out_gtid_list, uint32 *out_list_len); }; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 886ef79067b..b92ca2720ed 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -656,8 +656,12 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle, opt_master_verify_checksum)) || ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + { + if (ev) + delete ev; return "Could not read format description log event while looking for " "GTID position in binlog"; + } fdle= static_cast<Format_description_log_event *>(ev); @@ -795,6 +799,177 @@ end: } +/* + Given an old-style binlog position with file name and file offset, find the + corresponding gtid position. If the offset is not at an event boundary, give + an error. + + Return NULL on ok, error message string on error. + + ToDo: Improve the performance of this by using binlog index files. +*/ +static const char * +gtid_state_from_pos(const char *name, uint32 offset, + slave_connection_state *gtid_state) +{ + IO_CACHE cache; + File file; + const char *errormsg= NULL; + bool found_gtid_list_event= false; + bool found_format_description_event= false; + bool valid_pos= false; + uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; + int err; + String packet; + + if (gtid_state->load((const rpl_gtid *)NULL, 0)) + { + errormsg= "Internal error (out of memory?) initializing slave state " + "while scanning binlog to find start position"; + return errormsg; + } + + if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1) + return errormsg; + + /* + First we need to find the initial GTID_LIST_EVENT. We need this even + if the offset is at the very start of the binlog file. + + But if we do not find any GTID_LIST_EVENT, then this is an old binlog + with no GTID information, so we return empty GTID state. + */ + for (;;) + { + Log_event_type typ; + uint32 cur_pos; + + cur_pos= (uint32)my_b_tell(&cache); + if (cur_pos == offset) + valid_pos= true; + if (found_format_description_event && found_gtid_list_event && + cur_pos >= offset) + break; + + packet.length(0); + err= Log_event::read_log_event(&cache, &packet, NULL, + current_checksum_alg); + if (err) + { + errormsg= "Could not read binlog while searching for slave start " + "position on master"; + goto end; + } + /* + The cast to uchar is needed to avoid a signed char being converted to a + negative number. + */ + typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET]; + if (typ == FORMAT_DESCRIPTION_EVENT) + { + if (found_format_description_event) + { + errormsg= "Duplicate format description log event found while " + "searching for old-style position in binlog"; + goto end; + } + + current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length()); + found_format_description_event= true; + } + else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event) + { + errormsg= "Did not find format description log event while searching " + "for old-style position in binlog"; + goto end; + } + else if (typ == ROTATE_EVENT || typ == STOP_EVENT || + typ == BINLOG_CHECKPOINT_EVENT) + continue; /* Continue looking */ + else if (typ == GTID_LIST_EVENT) + { + rpl_gtid *gtid_list; + bool status; + uint32 list_len; + + if (found_gtid_list_event) + { + errormsg= "Found duplicate Gtid_list_log_event while scanning binlog " + "to find slave start position"; + goto end; + } + status= Gtid_list_log_event::peek(packet.ptr(), packet.length(), + >id_list, &list_len); + if (status) + { + errormsg= "Error reading Gtid_list_log_event while searching " + "for old-style position in binlog"; + goto end; + } + err= gtid_state->load(gtid_list, list_len); + my_free(gtid_list); + if (err) + { + errormsg= "Internal error (out of memory?) initialising slave state " + "while scanning binlog to find start position"; + goto end; + } + found_gtid_list_event= true; + } + else if (!found_gtid_list_event) + { + /* We did not find any Gtid_list_log_event, must be old binlog. */ + goto end; + } + else if (typ == GTID_EVENT) + { + rpl_gtid gtid; + uchar flags2; + if (Gtid_log_event::peek(packet.ptr(), packet.length(), >id.domain_id, + >id.server_id, >id.seq_no, &flags2)) + { + errormsg= "Corrupt gtid_log_event found while scanning binlog to find " + "initial slave position"; + goto end; + } + if (gtid_state->update(>id)) + { + errormsg= "Internal error (out of memory?) updating slave state while " + "scanning binlog to find start position"; + goto end; + } + } + } + + if (!valid_pos) + { + errormsg= "Slave requested incorrect position in master binlog. " + "Requested position %u in file '%s', but this position does not " + "correspond to the location of any binlog event."; + } + +end: + end_io_cache(&cache); + mysql_file_close(file, MYF(MY_WME)); + + return errormsg; +} + + +int +gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str) +{ + slave_connection_state gtid_state; + + if (pos < 4) + pos= 4; + if (gtid_state_from_pos(name, pos, >id_state) || + gtid_state.to_string(out_str)) + return 1; + return 0; +} + + enum enum_gtid_skip_type { GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION }; @@ -945,8 +1120,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, binlog positions. */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) - return "Failed to replace binlog checkpoint event with dummy: " - "too small event."; + return "Failed to replace binlog checkpoint or gtid list event with " + "dummy: too small event."; } } @@ -1010,7 +1185,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, char str_buf[256]; String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); bool using_gtid_state; - slave_connection_state gtid_state; + slave_connection_state gtid_state, return_gtid_state; enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 89fa0cf25be..73046dd8667 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -69,6 +69,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; void rpl_init_gtid_slave_state(); void rpl_deinit_gtid_slave_state(); int rpl_load_gtid_slave_state(THD *thd); +int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str); #endif /* HAVE_REPLICATION */ |