summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/item_create.cc24
-rw-r--r--sql/item_strfunc.cc35
-rw-r--r--sql/item_strfunc.h11
-rw-r--r--sql/log_event.cc101
-rw-r--r--sql/log_event.h5
-rw-r--r--sql/sql_repl.cc181
-rw-r--r--sql/sql_repl.h1
7 files changed, 355 insertions, 3 deletions
diff --git a/sql/item_create.cc b/sql/item_create.cc
index fc31b074055..bfa0e36005a 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -447,6 +447,19 @@ protected:
};
+class Create_func_binlog_gtid_pos : public Create_func_arg2
+{
+public:
+ virtual Item *create_2_arg(THD *thd, Item *arg1, Item *arg2);
+
+ static Create_func_binlog_gtid_pos s_singleton;
+
+protected:
+ Create_func_binlog_gtid_pos() {}
+ virtual ~Create_func_binlog_gtid_pos() {}
+};
+
+
class Create_func_bit_count : public Create_func_arg1
{
public:
@@ -3100,6 +3113,16 @@ Create_func_bin::create_1_arg(THD *thd, Item *arg1)
}
+Create_func_binlog_gtid_pos Create_func_binlog_gtid_pos::s_singleton;
+
+Item*
+Create_func_binlog_gtid_pos::create_2_arg(THD *thd, Item *arg1, Item *arg2)
+{
+ thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
+ return new (thd->mem_root) Item_func_binlog_gtid_pos(arg1, arg2);
+}
+
+
Create_func_bit_count Create_func_bit_count::s_singleton;
Item*
@@ -5323,6 +5346,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("ATAN2") }, BUILDER(Create_func_atan)},
{ { C_STRING_WITH_LEN("BENCHMARK") }, BUILDER(Create_func_benchmark)},
{ { C_STRING_WITH_LEN("BIN") }, BUILDER(Create_func_bin)},
+ { { C_STRING_WITH_LEN("BINLOG_GTID_POS") }, BUILDER(Create_func_binlog_gtid_pos)},
{ { C_STRING_WITH_LEN("BIT_COUNT") }, BUILDER(Create_func_bit_count)},
{ { C_STRING_WITH_LEN("BIT_LENGTH") }, BUILDER(Create_func_bit_length)},
{ { C_STRING_WITH_LEN("BUFFER") }, GEOM_BUILDER(Create_func_buffer)},
diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc
index 48de0c4bc5c..79ed795fe3e 100644
--- a/sql/item_strfunc.cc
+++ b/sql/item_strfunc.cc
@@ -59,6 +59,7 @@ C_MODE_START
#include "../mysys/my_static.h" // For soundex_map
C_MODE_END
#include "sql_show.h" // append_identifier
+#include <sql_repl.h>
/**
@todo Remove this. It is not safe to use a shared String object.
@@ -2725,6 +2726,40 @@ err:
}
+void Item_func_binlog_gtid_pos::fix_length_and_dec()
+{
+ collation.set(system_charset_info);
+ max_length= MAX_BLOB_WIDTH;
+ maybe_null= 1;
+}
+
+
+String *Item_func_binlog_gtid_pos::val_str(String *str)
+{
+ DBUG_ASSERT(fixed == 1);
+ String name_str, *name;
+ longlong pos;
+
+ if (args[0]->null_value || args[1]->null_value)
+ goto err;
+
+ name= args[0]->val_str(&name_str);
+ pos= args[1]->val_int();
+
+ if (pos < 0 || pos > UINT_MAX32)
+ goto err;
+
+ if (gtid_state_from_binlog_pos(name->c_ptr_safe(), (uint32)pos, str))
+ goto err;
+ null_value= 0;
+ return str;
+
+err:
+ null_value= 1;
+ return NULL;
+}
+
+
void Item_func_rpad::fix_length_and_dec()
{
// Handle character set for args[0] and args[2].
diff --git a/sql/item_strfunc.h b/sql/item_strfunc.h
index 486b7cf36ef..ddad7121325 100644
--- a/sql/item_strfunc.h
+++ b/sql/item_strfunc.h
@@ -616,6 +616,17 @@ public:
};
+class Item_func_binlog_gtid_pos :public Item_str_func
+{
+ String tmp_value;
+public:
+ Item_func_binlog_gtid_pos(Item *arg1,Item *arg2) :Item_str_func(arg1,arg2) {}
+ String *val_str(String *);
+ void fix_length_and_dec();
+ const char *func_name() const { return "binlog_gtid_pos"; }
+};
+
+
class Item_func_rpad :public Item_str_func
{
String tmp_value, rpad_str;
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 56fff03d411..649481a721c 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6701,6 +6701,19 @@ slave_connection_state::load(char *slave_request, size_t len)
}
+int
+slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
+{
+ uint32 i;
+
+ my_hash_reset(&hash);
+ for (i= 0; i < count; ++i)
+ if (update(&gtid_list[i]))
+ return 1;
+ return 0;
+}
+
+
rpl_gtid *
slave_connection_state::find(uint32 domain_id)
{
@@ -6708,6 +6721,30 @@ slave_connection_state::find(uint32 domain_id)
}
+int
+slave_connection_state::update(const rpl_gtid *in_gtid)
+{
+ rpl_gtid *new_gtid;
+ uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+ if (rec)
+ {
+ memcpy(rec, in_gtid, sizeof(*in_gtid));
+ return 0;
+ }
+
+ if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME))))
+ return 1;
+ memcpy(new_gtid, in_gtid, sizeof(*new_gtid));
+ if (my_hash_insert(&hash, (uchar *)new_gtid))
+ {
+ my_free(new_gtid);
+ return 1;
+ }
+
+ return 0;
+}
+
+
void
slave_connection_state::remove(const rpl_gtid *in_gtid)
{
@@ -6725,6 +6762,30 @@ slave_connection_state::remove(const rpl_gtid *in_gtid)
}
+int
+slave_connection_state::to_string(String *out_str)
+{
+ uint32 i;
+
+ out_str->length(0);
+ for (i= 0; i < hash.records; ++i)
+ {
+ const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
+ if (i && out_str->append(","))
+ return 1;
+ if (gtid->domain_id &&
+ (out_str->append_ulonglong(gtid->domain_id) ||
+ out_str->append("-")))
+ return 1;
+ if (out_str->append_ulonglong(gtid->server_id) ||
+ out_str->append("-") ||
+ out_str->append_ulonglong(gtid->seq_no))
+ return 1;
+ }
+ return 0;
+}
+
+
#endif /* MYSQL_SERVER */
@@ -7155,6 +7216,46 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
#endif /* MYSQL_SERVER */
+/*
+ Used to record gtid_list event while sending binlog to slave, without having to
+ fully contruct the event object.
+*/
+bool
+Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
+ rpl_gtid **out_gtid_list, uint32 *out_list_len)
+{
+ const char *p;
+ uint32 count_field, count;
+ rpl_gtid *gtid_list;
+
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
+ return true;
+ p= event_start + LOG_EVENT_HEADER_LEN;
+ count_field= uint4korr(p);
+ p+= 4;
+ count= count_field & ((1<<28)-1);
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN +
+ 16 * count)
+ return true;
+ if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count, MYF(MY_WME))))
+ return true;
+ *out_gtid_list= gtid_list;
+ *out_list_len= count;
+ while (count--)
+ {
+ gtid_list->domain_id= uint4korr(p);
+ p+= 4;
+ gtid_list->server_id= uint4korr(p);
+ p+= 4;
+ gtid_list->seq_no= uint8korr(p);
+ p+= 8;
+ ++gtid_list;
+ }
+
+ return false;
+}
+
+
/**************************************************************************
Intvar_log_event methods
**************************************************************************/
diff --git a/sql/log_event.h b/sql/log_event.h
index ff7460e7ef6..8c43f0465dc 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3073,9 +3073,12 @@ struct slave_connection_state
~slave_connection_state();
int load(char *slave_request, size_t len);
+ int load(const rpl_gtid *gtid_list, uint32 count);
rpl_gtid *find(uint32 domain_id);
+ int update(const rpl_gtid *in_gtid);
void remove(const rpl_gtid *gtid);
ulong count() const { return hash.records; }
+ int to_string(String *out_str);
};
@@ -3261,6 +3264,8 @@ public:
#ifdef MYSQL_SERVER
bool write(IO_CACHE *file);
#endif
+ static bool peek(const char *event_start, uint32 event_len,
+ rpl_gtid **out_gtid_list, uint32 *out_list_len);
};
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 886ef79067b..b92ca2720ed 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -656,8 +656,12 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
opt_master_verify_checksum)) ||
ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ {
+ if (ev)
+ delete ev;
return "Could not read format description log event while looking for "
"GTID position in binlog";
+ }
fdle= static_cast<Format_description_log_event *>(ev);
@@ -795,6 +799,177 @@ end:
}
+/*
+ Given an old-style binlog position with file name and file offset, find the
+ corresponding gtid position. If the offset is not at an event boundary, give
+ an error.
+
+ Return NULL on ok, error message string on error.
+
+ ToDo: Improve the performance of this by using binlog index files.
+*/
+static const char *
+gtid_state_from_pos(const char *name, uint32 offset,
+ slave_connection_state *gtid_state)
+{
+ IO_CACHE cache;
+ File file;
+ const char *errormsg= NULL;
+ bool found_gtid_list_event= false;
+ bool found_format_description_event= false;
+ bool valid_pos= false;
+ uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+ int err;
+ String packet;
+
+ if (gtid_state->load((const rpl_gtid *)NULL, 0))
+ {
+ errormsg= "Internal error (out of memory?) initializing slave state "
+ "while scanning binlog to find start position";
+ return errormsg;
+ }
+
+ if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
+ return errormsg;
+
+ /*
+ First we need to find the initial GTID_LIST_EVENT. We need this even
+ if the offset is at the very start of the binlog file.
+
+ But if we do not find any GTID_LIST_EVENT, then this is an old binlog
+ with no GTID information, so we return empty GTID state.
+ */
+ for (;;)
+ {
+ Log_event_type typ;
+ uint32 cur_pos;
+
+ cur_pos= (uint32)my_b_tell(&cache);
+ if (cur_pos == offset)
+ valid_pos= true;
+ if (found_format_description_event && found_gtid_list_event &&
+ cur_pos >= offset)
+ break;
+
+ packet.length(0);
+ err= Log_event::read_log_event(&cache, &packet, NULL,
+ current_checksum_alg);
+ if (err)
+ {
+ errormsg= "Could not read binlog while searching for slave start "
+ "position on master";
+ goto end;
+ }
+ /*
+ The cast to uchar is needed to avoid a signed char being converted to a
+ negative number.
+ */
+ typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
+ if (typ == FORMAT_DESCRIPTION_EVENT)
+ {
+ if (found_format_description_event)
+ {
+ errormsg= "Duplicate format description log event found while "
+ "searching for old-style position in binlog";
+ goto end;
+ }
+
+ current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
+ found_format_description_event= true;
+ }
+ else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
+ {
+ errormsg= "Did not find format description log event while searching "
+ "for old-style position in binlog";
+ goto end;
+ }
+ else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+ typ == BINLOG_CHECKPOINT_EVENT)
+ continue; /* Continue looking */
+ else if (typ == GTID_LIST_EVENT)
+ {
+ rpl_gtid *gtid_list;
+ bool status;
+ uint32 list_len;
+
+ if (found_gtid_list_event)
+ {
+ errormsg= "Found duplicate Gtid_list_log_event while scanning binlog "
+ "to find slave start position";
+ goto end;
+ }
+ status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
+ &gtid_list, &list_len);
+ if (status)
+ {
+ errormsg= "Error reading Gtid_list_log_event while searching "
+ "for old-style position in binlog";
+ goto end;
+ }
+ err= gtid_state->load(gtid_list, list_len);
+ my_free(gtid_list);
+ if (err)
+ {
+ errormsg= "Internal error (out of memory?) initialising slave state "
+ "while scanning binlog to find start position";
+ goto end;
+ }
+ found_gtid_list_event= true;
+ }
+ else if (!found_gtid_list_event)
+ {
+ /* We did not find any Gtid_list_log_event, must be old binlog. */
+ goto end;
+ }
+ else if (typ == GTID_EVENT)
+ {
+ rpl_gtid gtid;
+ uchar flags2;
+ if (Gtid_log_event::peek(packet.ptr(), packet.length(), &gtid.domain_id,
+ &gtid.server_id, &gtid.seq_no, &flags2))
+ {
+ errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
+ "initial slave position";
+ goto end;
+ }
+ if (gtid_state->update(&gtid))
+ {
+ errormsg= "Internal error (out of memory?) updating slave state while "
+ "scanning binlog to find start position";
+ goto end;
+ }
+ }
+ }
+
+ if (!valid_pos)
+ {
+ errormsg= "Slave requested incorrect position in master binlog. "
+ "Requested position %u in file '%s', but this position does not "
+ "correspond to the location of any binlog event.";
+ }
+
+end:
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+
+ return errormsg;
+}
+
+
+int
+gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str)
+{
+ slave_connection_state gtid_state;
+
+ if (pos < 4)
+ pos= 4;
+ if (gtid_state_from_pos(name, pos, &gtid_state) ||
+ gtid_state.to_string(out_str))
+ return 1;
+ return 0;
+}
+
+
enum enum_gtid_skip_type {
GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
};
@@ -945,8 +1120,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
binlog positions.
*/
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
- return "Failed to replace binlog checkpoint event with dummy: "
- "too small event.";
+ return "Failed to replace binlog checkpoint or gtid list event with "
+ "dummy: too small event.";
}
}
@@ -1010,7 +1185,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
char str_buf[256];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
bool using_gtid_state;
- slave_connection_state gtid_state;
+ slave_connection_state gtid_state, return_gtid_state;
enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 89fa0cf25be..73046dd8667 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -69,6 +69,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
int rpl_load_gtid_slave_state(THD *thd);
+int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
#endif /* HAVE_REPLICATION */