summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/rpl_gtid.cc402
-rw-r--r--sql/rpl_gtid.h224
-rw-r--r--sql/sql_parse.cc36
-rw-r--r--sql/sql_repl.cc44
-rw-r--r--sql/sql_repl.h3
5 files changed, 696 insertions, 13 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 8b10703fdc2..52e14714a9b 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -17,6 +17,7 @@
/* Definitions for MariaDB global transaction ID (GTID). */
+#ifndef MYSQL_CLIENT
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
@@ -1268,6 +1269,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
return true;
}
+#endif
/*
Parse a GTID at the start of a string, and update the pointer to point
@@ -1305,9 +1307,32 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
return 0;
}
+/*
+ Unpack a GTID at the start of a string, and update the pointer to point
+ at the first character after the unpacked GTID.
+
+ Returns 0 on ok, non-zero on parse error.
+*/
+static int
+gtid_unpack_helper(const char **ptr, const char *end, rpl_gtid *out_gtid)
+{
+ const char *p= *ptr;
+
+ if (p[4] != '-' || p[9] != '-')
+ return 1;
+
+ out_gtid->domain_id= (uint32)uint4korr(p);
+ out_gtid->server_id= (uint32)uint4korr(&p[5]);
+ out_gtid->seq_no= (uint64)uint8korr(&p[10]);
+
+ *ptr= p + 18;
+ return 0;
+}
rpl_gtid *
-gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+gtid_read_to_list(const char *str, size_t str_len, uint32 *out_len,
+ int reader_f(const char **ptr, const char *end,
+ rpl_gtid *out_gtid))
{
const char *p= const_cast<char *>(str);
const char *end= p + str_len;
@@ -1318,7 +1343,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
{
rpl_gtid gtid;
- if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, &gtid))
+ if (len >= (((uint32)1 << 28)-1) || reader_f(&p, end, &gtid))
{
my_free(list);
return NULL;
@@ -1345,6 +1370,20 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
}
+rpl_gtid *
+gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_parser_helper);
+}
+
+rpl_gtid *
+gtid_unpack_string_to_list(const char *str, size_t str_len, uint32 *out_len)
+{
+ return gtid_read_to_list(str, str_len, out_len, gtid_unpack_helper);
+}
+
+#ifndef MYSQL_CLIENT
+
/*
Update the slave replication state with the GTID position obtained from
master when connecting with old-style (filename,offset) position.
@@ -2952,3 +2991,362 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
queue_remove(&he->queue, elem->queue_idx);
}
+
+#endif
+
+Window_gtid_event_filter::Window_gtid_event_filter() :
+ m_has_start(FALSE),
+ m_has_stop(FALSE),
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+ {
+ // m_start and m_stop do not need initial values if unused
+ }
+
+Window_gtid_event_filter::Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop) :
+ m_is_active(FALSE),
+ m_has_passed(FALSE)
+{
+ DBUG_ASSERT(start->domain_id == stop->domain_id);
+
+ m_is_active= FALSE;
+ m_has_passed= FALSE;
+
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+}
+
+my_bool Window_gtid_event_filter::set_start_gtid(rpl_gtid *start)
+{
+ if (m_has_start)
+ return FALSE;
+
+ // Copy values
+ m_has_start= TRUE;
+ m_start.domain_id= start->domain_id;
+ m_start.server_id= start->server_id;
+ m_start.seq_no= start->seq_no;
+
+ return TRUE;
+}
+
+my_bool Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop)
+{
+ if (m_has_stop)
+ return FALSE;
+
+ // Copy values
+ m_has_stop= TRUE;
+ m_stop.domain_id= stop->domain_id;
+ m_stop.server_id= stop->server_id;
+ m_stop.seq_no= stop->seq_no;
+
+ return TRUE;
+}
+
+gtid_filter_identifier Window_gtid_event_filter::get_filter_identifier()
+{
+ DBUG_ASSERT(m_has_start || m_has_stop);
+ if (m_has_start)
+ return m_start.domain_id;
+ else
+ return m_stop.domain_id;
+}
+
+static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no >= boundary->seq_no;
+}
+
+static inline my_bool is_gtid_before(rpl_gtid *boundary,
+ rpl_gtid *test_gtid)
+{
+ return test_gtid->domain_id == boundary->domain_id &&
+ test_gtid->server_id == boundary->server_id &&
+ test_gtid->seq_no <= boundary->seq_no;
+}
+
+my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ /* Assume result should be excluded to start */
+ my_bool should_exclude= TRUE;
+
+ DBUG_ASSERT((m_has_start || m_has_stop) &&
+ (gtid->domain_id == m_start.domain_id ||
+ gtid->domain_id == m_stop.domain_id));
+
+ if (!m_is_active && !m_has_passed)
+ {
+ /*
+ This filter has not yet been activated. Check if the gtid is within the
+ bounds of this window.
+ */
+
+ if (!m_has_start)
+ {
+ /*
+ Start GTID was not provided, so we want to include everything up to m_stop
+ */
+ m_is_active= TRUE;
+ should_exclude= FALSE;
+ }
+ else if (is_gtid_at_or_after(&m_start, gtid))
+ {
+ m_is_active= TRUE;
+
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: Begin (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+
+ /*
+ As the start of the range is exclusive, if this gtid is the start of
+ the range, exclude it
+ */
+ if (gtid->seq_no == m_start.seq_no &&
+ gtid->server_id == m_start.server_id)
+ should_exclude= TRUE;
+ else
+ should_exclude= FALSE;
+ }
+ } /* if (!m_is_active && !m_has_passed) */
+ else if (m_is_active && !m_has_passed)
+ {
+ /*
+ This window is currently active so we want the event group to be included
+ in the results. Additionally check if we are at the end of the window.
+ If no end of the window is provided, go indefinitely
+ */
+ should_exclude= FALSE;
+
+ if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid))
+ {
+ DBUG_PRINT("gtid-event-filter",
+ ("Window: End (%d-%d-%d, %d-%d-%llu]", m_start.domain_id,
+ m_start.server_id, m_start.seq_no, m_stop.domain_id,
+ m_stop.server_id, m_stop.seq_no));
+ m_is_active= FALSE;
+ m_has_passed= TRUE;
+
+ if (gtid->server_id == m_stop.server_id && gtid->seq_no > m_stop.seq_no)
+ {
+ /*
+ The GTID is after the finite stop of the window, don't let it pass
+ through
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_start && is_gtid_before(&m_start, gtid))
+ {
+ /*
+ Out of order check, the window is active but this GTID takes place
+ before the window begins. keep the window active, but exclude it from
+ passing through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+ else if (m_has_passed && m_has_stop && is_gtid_before(&m_stop, gtid))
+ {
+ /* Test if events are out of order */
+ if (!m_has_start || (m_has_start && is_gtid_at_or_after(&m_start, gtid)))
+ {
+ /*
+ The filter window has closed because it has seen a GTID higher than its
+ end boundary; however, this GTID is out of order and should be passed
+ through.
+ */
+ should_exclude= TRUE;
+ }
+ }
+
+ return should_exclude;
+}
+
+Delegating_gtid_event_filter::Delegating_gtid_event_filter()
+{
+ uint32 i;
+
+ m_filter_id_mask= 0xf;
+
+ m_filters_by_id= (gtid_filter_element **) my_malloc(
+ PSI_NOT_INSTRUMENTED,
+ (m_filter_id_mask + 1) * sizeof(gtid_filter_element *),
+ MYF(MY_WME)
+ );
+
+ DBUG_ASSERT(m_filters_by_id != NULL);
+
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ m_filters_by_id[i]= NULL;
+ }
+
+ m_default_filter= new Reject_all_gtid_filter();
+}
+
+/*
+ Deconstructor deletes:
+ 1) All Identifiable_gtid_event_filters added
+ 2) All gtid_filter_element allocations
+*/
+Delegating_gtid_event_filter::~Delegating_gtid_event_filter()
+{
+ uint32 i;
+ for (i = 0; i <= m_filter_id_mask; i++)
+ {
+ gtid_filter_element *filter_element= m_filters_by_id[i],
+ *filter_element_to_del= NULL;
+ while(filter_element)
+ {
+ filter_element_to_del= filter_element;
+ filter_element= filter_element->next;
+ delete filter_element_to_del->filter;
+ my_free(filter_element_to_del);
+ }
+ }
+ my_free(m_filters_by_id);
+
+ delete m_default_filter;
+}
+
+void Delegating_gtid_event_filter::set_default_filter(Gtid_event_filter *filter)
+{
+ if (m_default_filter)
+ delete m_default_filter;
+
+ m_default_filter= filter;
+}
+
+gtid_filter_element *
+Delegating_gtid_event_filter::try_find_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx];
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ if (filter_idx->identifier == filter_id)
+ break;
+ filter_idx= filter_idx->next;
+ }
+
+ return filter_idx;
+}
+
+gtid_filter_element *
+Delegating_gtid_event_filter::find_or_create_filter_element_for_id(
+ gtid_filter_identifier filter_id)
+{
+ // Add this into the domain id list
+ uint32 map_idx= filter_id & m_filter_id_mask;
+ gtid_filter_element *filter_idx= m_filters_by_id[map_idx],
+ *prev_idx= NULL;
+
+ /* Find list index to add this filter */
+ while (filter_idx)
+ {
+ prev_idx= filter_idx;
+ if (filter_idx->identifier == filter_id)
+ {
+ break;
+ }
+ prev_idx= filter_idx;
+ filter_idx= filter_idx->next;
+ }
+
+ if (filter_idx == NULL)
+ {
+ // No other domain ids have filters that index here, create this one
+ filter_idx= (gtid_filter_element *) my_malloc(
+ PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME));
+ filter_idx->identifier= filter_id;
+ filter_idx->next= NULL;
+ filter_idx->filter= NULL;
+
+ if (prev_idx == NULL)
+ {
+ // This is the first filter in the bucket
+ m_filters_by_id[map_idx]= filter_idx;
+ }
+ else
+ {
+ // End of list, append filter list to tail
+ prev_idx->next= filter_idx;
+ }
+ }
+
+ return filter_idx;
+}
+
+my_bool Delegating_gtid_event_filter::exclude(rpl_gtid *gtid)
+{
+ Gtid_event_filter *filter;
+ gtid_filter_identifier filter_id= get_id_from_gtid(gtid);
+ gtid_filter_element *filter_element= try_find_filter_element_for_id(filter_id);
+ if (filter_element)
+ {
+ filter= filter_element->filter;
+ }
+ else
+ {
+ filter= m_default_filter;
+ }
+
+ return filter->exclude(gtid);
+}
+
+Window_gtid_event_filter *
+Domain_gtid_event_filter::find_or_create_window_filter_for_id(
+ uint32 domain_id)
+{
+ gtid_filter_element *filter_element=
+ find_or_create_filter_element_for_id(domain_id);
+ Window_gtid_event_filter *wgef= NULL;
+
+ if (filter_element->filter == NULL)
+ {
+ // New filter
+ wgef= new Window_gtid_event_filter();
+ filter_element->filter= wgef;
+ }
+ else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE)
+ {
+ // We have an existing window filter here
+ wgef= (Window_gtid_event_filter *) filter_element->filter;
+ }
+ /*
+ Else: We have an existing filter but it is not of window type so propogate
+ NULL filter
+ */
+
+ return wgef;
+}
+
+my_bool Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid)
+{
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+ return filter_to_update->set_start_gtid(gtid);
+}
+
+my_bool Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid)
+{
+ Window_gtid_event_filter *filter_to_update=
+ find_or_create_window_filter_for_id(gtid->domain_id);
+ return filter_to_update->set_stop_gtid(gtid);
+} \ No newline at end of file
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 11541c8000c..9f5058196b8 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -380,5 +380,229 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
extern int gtid_check_rpl_slave_state_table(TABLE *table);
extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len,
uint32 *out_len);
+extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len,
+ uint32 *out_len);
+
+/*
+ Interface to support different methods of filtering log events by GTID
+*/
+class Gtid_event_filter
+{
+public:
+ Gtid_event_filter() {};
+ virtual ~Gtid_event_filter() {};
+
+ enum gtid_event_filter_type
+ {
+ DELEGATING_GTID_FILTER_TYPE = 1,
+ WINDOW_GTID_FILTER_TYPE = 2,
+ REJECT_ALL_GTID_FILTER_TYPE = 3
+ };
+
+ /*
+ Run the filter on an input gtid to test if the corresponding log events
+ should be excluded from a result
+
+ Returns TRUE when the event group corresponding to the input GTID should be
+ excluded.
+ Returns FALSE when the event group should be included.
+ */
+ virtual my_bool exclude(rpl_gtid *) = 0;
+
+ /*
+ The gtid_event_filter_type that corresponds to the underlying filter
+ implementation
+ */
+ virtual uint32 get_filter_type() = 0;
+};
+
+/*
+ Filter implementation which will exclude any and all input GTIDs. This is
+ used to set default behavior for GTIDs that do not have explicit filters
+ set on their domain_id, e.g. when a Window_gtid_event_filter is used for
+ a specific domain, then all other domain_ids will be rejected using this
+ filter implementation.
+*/
+class Reject_all_gtid_filter : public Gtid_event_filter
+{
+public:
+ Reject_all_gtid_filter() {}
+ ~Reject_all_gtid_filter() {}
+ my_bool exclude(rpl_gtid *gtid) { return TRUE; }
+ uint32 get_filter_type() { return REJECT_ALL_GTID_FILTER_TYPE; }
+};
+
+/*
+ A virtual sub-class of Gtid_event_filter which allows for quick identification
+ of potentially applicable filters for arbitrary GTIDs.
+*/
+typedef uint32 gtid_filter_identifier;
+class Identifiable_gtid_event_filter : public Gtid_event_filter
+{
+
+public:
+ Identifiable_gtid_event_filter() {};
+ virtual ~Identifiable_gtid_event_filter() {};
+
+ enum gtid_filter_lookup_flags
+ {
+ BY_DOMAIN_ID= 0x1
+ };
+
+ virtual my_bool exclude(rpl_gtid *) = 0;
+ virtual uint32 get_filter_type() = 0;
+ virtual int get_lookup_flags() = 0;
+ virtual gtid_filter_identifier get_filter_identifier() = 0;
+};
+
+/*
+ A filter implementation that passes through events between two GTIDs, m_start
+ (exclusive) and m_stop (inclusive).
+
+ This filter is stateful, such that it expects GTIDs to be a sequential
+ stream, and internally, the window will activate/deactivate when the start
+ and stop positions of the event stream have passed through, respectively.
+
+ Window activation is used to pass through events from arbitrary servers that
+ were not mentioned within m_start or m_stop, yet still fall within the
+ boundary.
+*/
+class Window_gtid_event_filter : public Identifiable_gtid_event_filter
+{
+public:
+ Window_gtid_event_filter();
+ Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop);
+ ~Window_gtid_event_filter() {}
+
+ my_bool exclude(rpl_gtid*);
+ gtid_filter_identifier get_filter_identifier();
+
+ my_bool set_start_gtid(rpl_gtid *start);
+ my_bool set_stop_gtid(rpl_gtid *stop);
+
+ /*
+ Windows are indexed by the domain_id of a GTID
+ */
+ int get_lookup_flags()
+ {
+ return BY_DOMAIN_ID;
+ }
+
+ uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; }
+
+private:
+ /*
+ m_has_start : Indicates if a start to this window has been explicitly
+ provided. A window starts immediately if not provided.
+ */
+ my_bool m_has_start;
+
+ /*
+ m_has_stop : Indicates if a stop to this window has been explicitly
+ provided. A window continues indefinitely if not provided.
+ */
+ my_bool m_has_stop;
+
+ /*
+ m_is_active : Indicates whether or not the program is currently reading
+ events from within this window. When TRUE, events with
+ different server ids than those specified by m_start or
+ m_stop will be passed through.
+ */
+ my_bool m_is_active;
+
+ /*
+ m_has_passed : Indicates whether or not the program is currently reading
+ events from within this window.
+ */
+ my_bool m_has_passed;
+
+ /* m_start : marks the GTID that begins the window (exclusive). */
+ rpl_gtid m_start;
+
+ /* m_stop : marks the GTID that ends the range (inclusive). */
+ rpl_gtid m_stop;
+
+ /* last_gtid_seen: saves the last */
+ rpl_gtid last_gtid_seen;
+};
+
+/*
+ Data structure to help with quick lookup for filters. More specifically,
+ if two filters have identifiers that lead to the same hash, they will be
+ put into a linked list.
+*/
+typedef struct _gtid_filter_element
+{
+ gtid_filter_identifier identifier;
+ Identifiable_gtid_event_filter *filter;
+ struct _gtid_filter_element *next;
+} gtid_filter_element;
+
+/*
+ Gtid_event_filter subclass which has no specific implementation, but rather
+ delegates the filtering to specific identifiable/mapped implementations.
+
+ A default filter is used for GTIDs that are passed through which no explicit
+ filter can be identified.
+
+ This class should be subclassed, where the get_id_from_gtid function
+ specifies how to extract the filter identifier from a GTID.
+*/
+class Delegating_gtid_event_filter : public Gtid_event_filter
+{
+public:
+ Delegating_gtid_event_filter();
+ ~Delegating_gtid_event_filter();
+
+ my_bool exclude(rpl_gtid *gtid);
+ void set_default_filter(Gtid_event_filter *default_filter);
+
+ uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; }
+
+ virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0;
+
+protected:
+
+ uint32 m_filter_id_mask;
+ Gtid_event_filter *m_default_filter;
+
+ /*
+ To reduce time to find a gtid window, they are indexed by domain_id. More
+ specifically, domain_ids are arranged into m_filter_id_mask+1 buckets, and
+ each bucket is a linked list of gtid_filter_elements that share the same
+ index. The index itself is found by a bitwise and, i.e.
+ some_rpl_gtid.domain_id & m_filter_id_mask
+ */
+ gtid_filter_element **m_filters_by_id;
+
+ gtid_filter_element *try_find_filter_element_for_id(gtid_filter_identifier);
+ gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier);
+};
+
+/*
+ A subclass of Delegating_gtid_event_filter which identifies filters using the
+ domain id of a GTID.
+
+ Additional helper functions include:
+ add_start_gtid(GTID) : adds a start GTID position to this filter, to be
+ identified by its domain id
+ add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be
+ identified by its domain id
+*/
+class Domain_gtid_event_filter : public Delegating_gtid_event_filter
+{
+public:
+ gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid)
+ {
+ return gtid->domain_id;
+ }
+
+ my_bool add_start_gtid(rpl_gtid *gtid);
+ my_bool add_stop_gtid(rpl_gtid *gtid);
+
+private:
+ Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier);
+};
#endif /* RPL_GTID_H */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index e46e46f803c..9f23f537cb1 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2123,8 +2123,11 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD *
case COM_BINLOG_DUMP:
{
ulong pos;
+ uint32 n_start_gtids;
+ rpl_gtid *start_gtids= NULL;
ushort flags;
uint32 slave_server_id;
+ uint32 unpack_idx= 0;
status_var_increment(thd->status_var.com_other);
@@ -2133,19 +2136,42 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD *
break;
/* TODO: The following has to be changed to an 8 byte integer */
- pos = uint4korr(packet);
- flags = uint2korr(packet + 4);
+ if (packet[4] == '-' && packet[9] == '-')
+ {
+ unpack_idx= 18;
+ while (packet[unpack_idx] == ',')
+ unpack_idx += 19; // 18 chars for gtid + 1 for comma
+ start_gtids= gtid_unpack_string_to_list(packet, unpack_idx, &n_start_gtids);
+
+ /*
+ Set pos to the start of the binlog file for scanning
+
+ TODO: When GTID indexing is complete (MDEV-4991), update pos by
+ looking it up in the index
+ */
+ pos= 4;
+ } /* if (packet[4] == '-' && packet[9] == '-') */
+ else
+ {
+ /* Single numeric log position case */
+ pos = uint4korr(packet);
+ unpack_idx += 4;
+ }
+ flags = uint2korr(packet + unpack_idx);
+ unpack_idx += 2;
thd->variables.server_id=0; /* avoid suicide */
- if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0
+ if ((slave_server_id= uint4korr(packet+unpack_idx))) // mysqlbinlog.server_id==0
kill_zombie_dump_threads(slave_server_id);
thd->variables.server_id = slave_server_id;
+ unpack_idx += 4;
- const char *name= packet + 10;
+ const char *name= packet + unpack_idx;
size_t nlen= strlen(name);
general_log_print(thd, command, "Log: '%s' Pos: %lu", name, pos);
if (nlen < FN_REFLEN)
- mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags);
+ mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags,
+ start_gtids, n_start_gtids);
thd->unregister_slave(); // todo: can be extraneous
/* fake COM_QUIT -- if we get here, the thread needs to terminate */
error = TRUE;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 7bcff12a735..00fc65cf82f 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -163,6 +163,8 @@ struct binlog_send_info {
bool should_stop;
size_t dirlen;
+ Gtid_event_filter *gtid_event_filter;
+
binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
@@ -185,6 +187,8 @@ struct binlog_send_info {
error_text[0] = 0;
bzero(&error_gtid, sizeof(error_gtid));
until_binlog_state.init();
+
+ gtid_event_filter= NULL;
}
};
@@ -1751,6 +1755,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && info->using_gtid_state)
{
@@ -1758,7 +1763,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
slave_connection_state::entry *gtid_entry;
rpl_gtid *gtid;
- if (gtid_state->count() > 0 || until_gtid_state)
+ if (gtid_state->count() > 0 || until_gtid_state || info->gtid_event_filter)
{
rpl_gtid event_gtid;
@@ -1899,6 +1904,17 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
}
+
+ /*
+ Should this result be excluded from the output?
+ */
+ if (info->gtid_event_filter &&
+ info->gtid_event_filter->exclude(&event_gtid))
+ {
+ info->gtid_skip_group=
+ (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE
+ : GTID_SKIP_TRANSACTION);
+ }
}
}
@@ -2110,7 +2126,9 @@ err:
static int init_binlog_sender(binlog_send_info *info,
LOG_INFO *linfo,
const char *log_ident,
- my_off_t *pos)
+ my_off_t *pos,
+ rpl_gtid *start_gtids,
+ size_t n_start_gtids)
{
THD *thd= info->thd;
int error;
@@ -2130,7 +2148,8 @@ static int init_binlog_sender(binlog_send_info *info,
info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
- info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state) ||
+ start_gtids != NULL;
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
info->using_gtid_state= false;);
@@ -2247,6 +2266,17 @@ static int init_binlog_sender(binlog_send_info *info,
info->clear_initial_log_pos= true;
}
+ if (start_gtids != NULL)
+ {
+ Domain_gtid_event_filter *filter= new Domain_gtid_event_filter();
+ my_off_t i;
+ for(i = 0; i < n_start_gtids; i++)
+ {
+ filter->add_start_gtid(&start_gtids[i]);
+ }
+ info->gtid_event_filter= filter;
+ }
+
return 0;
}
@@ -2840,7 +2870,8 @@ static int send_one_binlog_file(binlog_send_info *info,
}
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
- ushort flags)
+ ushort flags, rpl_gtid *start_gtids,
+ uint32 n_start_gtids)
{
LOG_INFO linfo;
@@ -2860,7 +2891,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
bzero((char*) &log,sizeof(log));
- if (init_binlog_sender(info, &linfo, log_ident, &pos))
+ if (init_binlog_sender(info, &linfo, log_ident, &pos, start_gtids,
+ n_start_gtids))
goto err;
has_transmit_started= true;
@@ -3022,6 +3054,8 @@ err:
thd->reset_current_linfo();
thd->variables.max_allowed_packet= old_max_allowed_packet;
delete info->fdev;
+ delete info->gtid_event_filter;
+ my_free(start_gtids);
if (likely(info->error == 0))
{
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 95916e31abf..bfc35ea5456 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -57,7 +57,8 @@ struct LOAD_FILE_IO_CACHE : public IO_CACHE
int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count);
int init_replication_sys_vars();
-void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
+void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags,
+ rpl_gtid *start_gtids, uint32 n_start_gtids);
#ifdef HAVE_PSI_INTERFACE
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;