diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/rpl_gtid.cc | 402 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 224 | ||||
-rw-r--r-- | sql/sql_parse.cc | 36 | ||||
-rw-r--r-- | sql/sql_repl.cc | 44 | ||||
-rw-r--r-- | sql/sql_repl.h | 3 |
5 files changed, 696 insertions, 13 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 8b10703fdc2..52e14714a9b 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -17,6 +17,7 @@ /* Definitions for MariaDB global transaction ID (GTID). */ +#ifndef MYSQL_CLIENT #include "mariadb.h" #include "sql_priv.h" #include "unireg.h" @@ -1268,6 +1269,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) return true; } +#endif /* Parse a GTID at the start of a string, and update the pointer to point @@ -1305,9 +1307,32 @@ gtid_parser_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) return 0; } +/* + Unpack a GTID at the start of a string, and update the pointer to point + at the first character after the unpacked GTID. + + Returns 0 on ok, non-zero on parse error. +*/ +static int +gtid_unpack_helper(const char **ptr, const char *end, rpl_gtid *out_gtid) +{ + const char *p= *ptr; + + if (p[4] != '-' || p[9] != '-') + return 1; + + out_gtid->domain_id= (uint32)uint4korr(p); + out_gtid->server_id= (uint32)uint4korr(&p[5]); + out_gtid->seq_no= (uint64)uint8korr(&p[10]); + + *ptr= p + 18; + return 0; +} rpl_gtid * -gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) +gtid_read_to_list(const char *str, size_t str_len, uint32 *out_len, + int reader_f(const char **ptr, const char *end, + rpl_gtid *out_gtid)) { const char *p= const_cast<char *>(str); const char *end= p + str_len; @@ -1318,7 +1343,7 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) { rpl_gtid gtid; - if (len >= (((uint32)1 << 28)-1) || gtid_parser_helper(&p, end, >id)) + if (len >= (((uint32)1 << 28)-1) || reader_f(&p, end, >id)) { my_free(list); return NULL; @@ -1345,6 +1370,20 @@ gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) } +rpl_gtid * +gtid_parse_string_to_list(const char *str, size_t str_len, uint32 *out_len) +{ + return gtid_read_to_list(str, str_len, out_len, gtid_parser_helper); +} + +rpl_gtid * +gtid_unpack_string_to_list(const char *str, size_t str_len, uint32 *out_len) +{ + return gtid_read_to_list(str, str_len, out_len, gtid_unpack_helper); +} + +#ifndef MYSQL_CLIENT + /* Update the slave replication state with the GTID position obtained from master when connecting with old-style (filename,offset) position. @@ -2952,3 +2991,362 @@ gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he, queue_remove(&he->queue, elem->queue_idx); } + +#endif + +Window_gtid_event_filter::Window_gtid_event_filter() : + m_has_start(FALSE), + m_has_stop(FALSE), + m_is_active(FALSE), + m_has_passed(FALSE) + { + // m_start and m_stop do not need initial values if unused + } + +Window_gtid_event_filter::Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop) : + m_is_active(FALSE), + m_has_passed(FALSE) +{ + DBUG_ASSERT(start->domain_id == stop->domain_id); + + m_is_active= FALSE; + m_has_passed= FALSE; + + m_has_start= TRUE; + m_start.domain_id= start->domain_id; + m_start.server_id= start->server_id; + m_start.seq_no= start->seq_no; + + m_has_stop= TRUE; + m_stop.domain_id= stop->domain_id; + m_stop.server_id= stop->server_id; + m_stop.seq_no= stop->seq_no; +} + +my_bool Window_gtid_event_filter::set_start_gtid(rpl_gtid *start) +{ + if (m_has_start) + return FALSE; + + // Copy values + m_has_start= TRUE; + m_start.domain_id= start->domain_id; + m_start.server_id= start->server_id; + m_start.seq_no= start->seq_no; + + return TRUE; +} + +my_bool Window_gtid_event_filter::set_stop_gtid(rpl_gtid *stop) +{ + if (m_has_stop) + return FALSE; + + // Copy values + m_has_stop= TRUE; + m_stop.domain_id= stop->domain_id; + m_stop.server_id= stop->server_id; + m_stop.seq_no= stop->seq_no; + + return TRUE; +} + +gtid_filter_identifier Window_gtid_event_filter::get_filter_identifier() +{ + DBUG_ASSERT(m_has_start || m_has_stop); + if (m_has_start) + return m_start.domain_id; + else + return m_stop.domain_id; +} + +static inline my_bool is_gtid_at_or_after(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->server_id == boundary->server_id && + test_gtid->seq_no >= boundary->seq_no; +} + +static inline my_bool is_gtid_before(rpl_gtid *boundary, + rpl_gtid *test_gtid) +{ + return test_gtid->domain_id == boundary->domain_id && + test_gtid->server_id == boundary->server_id && + test_gtid->seq_no <= boundary->seq_no; +} + +my_bool Window_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + /* Assume result should be excluded to start */ + my_bool should_exclude= TRUE; + + DBUG_ASSERT((m_has_start || m_has_stop) && + (gtid->domain_id == m_start.domain_id || + gtid->domain_id == m_stop.domain_id)); + + if (!m_is_active && !m_has_passed) + { + /* + This filter has not yet been activated. Check if the gtid is within the + bounds of this window. + */ + + if (!m_has_start) + { + /* + Start GTID was not provided, so we want to include everything up to m_stop + */ + m_is_active= TRUE; + should_exclude= FALSE; + } + else if (is_gtid_at_or_after(&m_start, gtid)) + { + m_is_active= TRUE; + + DBUG_PRINT("gtid-event-filter", + ("Window: Begin (%d-%d-%d, %d-%d-%llu]", m_start.domain_id, + m_start.server_id, m_start.seq_no, m_stop.domain_id, + m_stop.server_id, m_stop.seq_no)); + + /* + As the start of the range is exclusive, if this gtid is the start of + the range, exclude it + */ + if (gtid->seq_no == m_start.seq_no && + gtid->server_id == m_start.server_id) + should_exclude= TRUE; + else + should_exclude= FALSE; + } + } /* if (!m_is_active && !m_has_passed) */ + else if (m_is_active && !m_has_passed) + { + /* + This window is currently active so we want the event group to be included + in the results. Additionally check if we are at the end of the window. + If no end of the window is provided, go indefinitely + */ + should_exclude= FALSE; + + if (m_has_stop && is_gtid_at_or_after(&m_stop, gtid)) + { + DBUG_PRINT("gtid-event-filter", + ("Window: End (%d-%d-%d, %d-%d-%llu]", m_start.domain_id, + m_start.server_id, m_start.seq_no, m_stop.domain_id, + m_stop.server_id, m_stop.seq_no)); + m_is_active= FALSE; + m_has_passed= TRUE; + + if (gtid->server_id == m_stop.server_id && gtid->seq_no > m_stop.seq_no) + { + /* + The GTID is after the finite stop of the window, don't let it pass + through + */ + should_exclude= TRUE; + } + } + else if (m_has_start && is_gtid_before(&m_start, gtid)) + { + /* + Out of order check, the window is active but this GTID takes place + before the window begins. keep the window active, but exclude it from + passing through. + */ + should_exclude= TRUE; + } + } + else if (m_has_passed && m_has_stop && is_gtid_before(&m_stop, gtid)) + { + /* Test if events are out of order */ + if (!m_has_start || (m_has_start && is_gtid_at_or_after(&m_start, gtid))) + { + /* + The filter window has closed because it has seen a GTID higher than its + end boundary; however, this GTID is out of order and should be passed + through. + */ + should_exclude= TRUE; + } + } + + return should_exclude; +} + +Delegating_gtid_event_filter::Delegating_gtid_event_filter() +{ + uint32 i; + + m_filter_id_mask= 0xf; + + m_filters_by_id= (gtid_filter_element **) my_malloc( + PSI_NOT_INSTRUMENTED, + (m_filter_id_mask + 1) * sizeof(gtid_filter_element *), + MYF(MY_WME) + ); + + DBUG_ASSERT(m_filters_by_id != NULL); + + for (i = 0; i <= m_filter_id_mask; i++) + { + m_filters_by_id[i]= NULL; + } + + m_default_filter= new Reject_all_gtid_filter(); +} + +/* + Deconstructor deletes: + 1) All Identifiable_gtid_event_filters added + 2) All gtid_filter_element allocations +*/ +Delegating_gtid_event_filter::~Delegating_gtid_event_filter() +{ + uint32 i; + for (i = 0; i <= m_filter_id_mask; i++) + { + gtid_filter_element *filter_element= m_filters_by_id[i], + *filter_element_to_del= NULL; + while(filter_element) + { + filter_element_to_del= filter_element; + filter_element= filter_element->next; + delete filter_element_to_del->filter; + my_free(filter_element_to_del); + } + } + my_free(m_filters_by_id); + + delete m_default_filter; +} + +void Delegating_gtid_event_filter::set_default_filter(Gtid_event_filter *filter) +{ + if (m_default_filter) + delete m_default_filter; + + m_default_filter= filter; +} + +gtid_filter_element * +Delegating_gtid_event_filter::try_find_filter_element_for_id( + gtid_filter_identifier filter_id) +{ + // Add this into the domain id list + uint32 map_idx= filter_id & m_filter_id_mask; + gtid_filter_element *filter_idx= m_filters_by_id[map_idx]; + + /* Find list index to add this filter */ + while (filter_idx) + { + if (filter_idx->identifier == filter_id) + break; + filter_idx= filter_idx->next; + } + + return filter_idx; +} + +gtid_filter_element * +Delegating_gtid_event_filter::find_or_create_filter_element_for_id( + gtid_filter_identifier filter_id) +{ + // Add this into the domain id list + uint32 map_idx= filter_id & m_filter_id_mask; + gtid_filter_element *filter_idx= m_filters_by_id[map_idx], + *prev_idx= NULL; + + /* Find list index to add this filter */ + while (filter_idx) + { + prev_idx= filter_idx; + if (filter_idx->identifier == filter_id) + { + break; + } + prev_idx= filter_idx; + filter_idx= filter_idx->next; + } + + if (filter_idx == NULL) + { + // No other domain ids have filters that index here, create this one + filter_idx= (gtid_filter_element *) my_malloc( + PSI_NOT_INSTRUMENTED, sizeof(gtid_filter_element), MYF(MY_WME)); + filter_idx->identifier= filter_id; + filter_idx->next= NULL; + filter_idx->filter= NULL; + + if (prev_idx == NULL) + { + // This is the first filter in the bucket + m_filters_by_id[map_idx]= filter_idx; + } + else + { + // End of list, append filter list to tail + prev_idx->next= filter_idx; + } + } + + return filter_idx; +} + +my_bool Delegating_gtid_event_filter::exclude(rpl_gtid *gtid) +{ + Gtid_event_filter *filter; + gtid_filter_identifier filter_id= get_id_from_gtid(gtid); + gtid_filter_element *filter_element= try_find_filter_element_for_id(filter_id); + if (filter_element) + { + filter= filter_element->filter; + } + else + { + filter= m_default_filter; + } + + return filter->exclude(gtid); +} + +Window_gtid_event_filter * +Domain_gtid_event_filter::find_or_create_window_filter_for_id( + uint32 domain_id) +{ + gtid_filter_element *filter_element= + find_or_create_filter_element_for_id(domain_id); + Window_gtid_event_filter *wgef= NULL; + + if (filter_element->filter == NULL) + { + // New filter + wgef= new Window_gtid_event_filter(); + filter_element->filter= wgef; + } + else if (filter_element->filter->get_filter_type() == WINDOW_GTID_FILTER_TYPE) + { + // We have an existing window filter here + wgef= (Window_gtid_event_filter *) filter_element->filter; + } + /* + Else: We have an existing filter but it is not of window type so propogate + NULL filter + */ + + return wgef; +} + +my_bool Domain_gtid_event_filter::add_start_gtid(rpl_gtid *gtid) +{ + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + return filter_to_update->set_start_gtid(gtid); +} + +my_bool Domain_gtid_event_filter::add_stop_gtid(rpl_gtid *gtid) +{ + Window_gtid_event_filter *filter_to_update= + find_or_create_window_filter_for_id(gtid->domain_id); + return filter_to_update->set_stop_gtid(gtid); +}
\ No newline at end of file diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 11541c8000c..9f5058196b8 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -380,5 +380,229 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, extern int gtid_check_rpl_slave_state_table(TABLE *table); extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len, uint32 *out_len); +extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len, + uint32 *out_len); + +/* + Interface to support different methods of filtering log events by GTID +*/ +class Gtid_event_filter +{ +public: + Gtid_event_filter() {}; + virtual ~Gtid_event_filter() {}; + + enum gtid_event_filter_type + { + DELEGATING_GTID_FILTER_TYPE = 1, + WINDOW_GTID_FILTER_TYPE = 2, + REJECT_ALL_GTID_FILTER_TYPE = 3 + }; + + /* + Run the filter on an input gtid to test if the corresponding log events + should be excluded from a result + + Returns TRUE when the event group corresponding to the input GTID should be + excluded. + Returns FALSE when the event group should be included. + */ + virtual my_bool exclude(rpl_gtid *) = 0; + + /* + The gtid_event_filter_type that corresponds to the underlying filter + implementation + */ + virtual uint32 get_filter_type() = 0; +}; + +/* + Filter implementation which will exclude any and all input GTIDs. This is + used to set default behavior for GTIDs that do not have explicit filters + set on their domain_id, e.g. when a Window_gtid_event_filter is used for + a specific domain, then all other domain_ids will be rejected using this + filter implementation. +*/ +class Reject_all_gtid_filter : public Gtid_event_filter +{ +public: + Reject_all_gtid_filter() {} + ~Reject_all_gtid_filter() {} + my_bool exclude(rpl_gtid *gtid) { return TRUE; } + uint32 get_filter_type() { return REJECT_ALL_GTID_FILTER_TYPE; } +}; + +/* + A virtual sub-class of Gtid_event_filter which allows for quick identification + of potentially applicable filters for arbitrary GTIDs. +*/ +typedef uint32 gtid_filter_identifier; +class Identifiable_gtid_event_filter : public Gtid_event_filter +{ + +public: + Identifiable_gtid_event_filter() {}; + virtual ~Identifiable_gtid_event_filter() {}; + + enum gtid_filter_lookup_flags + { + BY_DOMAIN_ID= 0x1 + }; + + virtual my_bool exclude(rpl_gtid *) = 0; + virtual uint32 get_filter_type() = 0; + virtual int get_lookup_flags() = 0; + virtual gtid_filter_identifier get_filter_identifier() = 0; +}; + +/* + A filter implementation that passes through events between two GTIDs, m_start + (exclusive) and m_stop (inclusive). + + This filter is stateful, such that it expects GTIDs to be a sequential + stream, and internally, the window will activate/deactivate when the start + and stop positions of the event stream have passed through, respectively. + + Window activation is used to pass through events from arbitrary servers that + were not mentioned within m_start or m_stop, yet still fall within the + boundary. +*/ +class Window_gtid_event_filter : public Identifiable_gtid_event_filter +{ +public: + Window_gtid_event_filter(); + Window_gtid_event_filter(rpl_gtid *start, rpl_gtid *stop); + ~Window_gtid_event_filter() {} + + my_bool exclude(rpl_gtid*); + gtid_filter_identifier get_filter_identifier(); + + my_bool set_start_gtid(rpl_gtid *start); + my_bool set_stop_gtid(rpl_gtid *stop); + + /* + Windows are indexed by the domain_id of a GTID + */ + int get_lookup_flags() + { + return BY_DOMAIN_ID; + } + + uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; } + +private: + /* + m_has_start : Indicates if a start to this window has been explicitly + provided. A window starts immediately if not provided. + */ + my_bool m_has_start; + + /* + m_has_stop : Indicates if a stop to this window has been explicitly + provided. A window continues indefinitely if not provided. + */ + my_bool m_has_stop; + + /* + m_is_active : Indicates whether or not the program is currently reading + events from within this window. When TRUE, events with + different server ids than those specified by m_start or + m_stop will be passed through. + */ + my_bool m_is_active; + + /* + m_has_passed : Indicates whether or not the program is currently reading + events from within this window. + */ + my_bool m_has_passed; + + /* m_start : marks the GTID that begins the window (exclusive). */ + rpl_gtid m_start; + + /* m_stop : marks the GTID that ends the range (inclusive). */ + rpl_gtid m_stop; + + /* last_gtid_seen: saves the last */ + rpl_gtid last_gtid_seen; +}; + +/* + Data structure to help with quick lookup for filters. More specifically, + if two filters have identifiers that lead to the same hash, they will be + put into a linked list. +*/ +typedef struct _gtid_filter_element +{ + gtid_filter_identifier identifier; + Identifiable_gtid_event_filter *filter; + struct _gtid_filter_element *next; +} gtid_filter_element; + +/* + Gtid_event_filter subclass which has no specific implementation, but rather + delegates the filtering to specific identifiable/mapped implementations. + + A default filter is used for GTIDs that are passed through which no explicit + filter can be identified. + + This class should be subclassed, where the get_id_from_gtid function + specifies how to extract the filter identifier from a GTID. +*/ +class Delegating_gtid_event_filter : public Gtid_event_filter +{ +public: + Delegating_gtid_event_filter(); + ~Delegating_gtid_event_filter(); + + my_bool exclude(rpl_gtid *gtid); + void set_default_filter(Gtid_event_filter *default_filter); + + uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; } + + virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0; + +protected: + + uint32 m_filter_id_mask; + Gtid_event_filter *m_default_filter; + + /* + To reduce time to find a gtid window, they are indexed by domain_id. More + specifically, domain_ids are arranged into m_filter_id_mask+1 buckets, and + each bucket is a linked list of gtid_filter_elements that share the same + index. The index itself is found by a bitwise and, i.e. + some_rpl_gtid.domain_id & m_filter_id_mask + */ + gtid_filter_element **m_filters_by_id; + + gtid_filter_element *try_find_filter_element_for_id(gtid_filter_identifier); + gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier); +}; + +/* + A subclass of Delegating_gtid_event_filter which identifies filters using the + domain id of a GTID. + + Additional helper functions include: + add_start_gtid(GTID) : adds a start GTID position to this filter, to be + identified by its domain id + add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be + identified by its domain id +*/ +class Domain_gtid_event_filter : public Delegating_gtid_event_filter +{ +public: + gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid) + { + return gtid->domain_id; + } + + my_bool add_start_gtid(rpl_gtid *gtid); + my_bool add_stop_gtid(rpl_gtid *gtid); + +private: + Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier); +}; #endif /* RPL_GTID_H */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index e46e46f803c..9f23f537cb1 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2123,8 +2123,11 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD * case COM_BINLOG_DUMP: { ulong pos; + uint32 n_start_gtids; + rpl_gtid *start_gtids= NULL; ushort flags; uint32 slave_server_id; + uint32 unpack_idx= 0; status_var_increment(thd->status_var.com_other); @@ -2133,19 +2136,42 @@ dispatch_command_return dispatch_command(enum enum_server_command command, THD * break; /* TODO: The following has to be changed to an 8 byte integer */ - pos = uint4korr(packet); - flags = uint2korr(packet + 4); + if (packet[4] == '-' && packet[9] == '-') + { + unpack_idx= 18; + while (packet[unpack_idx] == ',') + unpack_idx += 19; // 18 chars for gtid + 1 for comma + start_gtids= gtid_unpack_string_to_list(packet, unpack_idx, &n_start_gtids); + + /* + Set pos to the start of the binlog file for scanning + + TODO: When GTID indexing is complete (MDEV-4991), update pos by + looking it up in the index + */ + pos= 4; + } /* if (packet[4] == '-' && packet[9] == '-') */ + else + { + /* Single numeric log position case */ + pos = uint4korr(packet); + unpack_idx += 4; + } + flags = uint2korr(packet + unpack_idx); + unpack_idx += 2; thd->variables.server_id=0; /* avoid suicide */ - if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0 + if ((slave_server_id= uint4korr(packet+unpack_idx))) // mysqlbinlog.server_id==0 kill_zombie_dump_threads(slave_server_id); thd->variables.server_id = slave_server_id; + unpack_idx += 4; - const char *name= packet + 10; + const char *name= packet + unpack_idx; size_t nlen= strlen(name); general_log_print(thd, command, "Log: '%s' Pos: %lu", name, pos); if (nlen < FN_REFLEN) - mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags); + mysql_binlog_send(thd, thd->strmake(name, nlen), (my_off_t)pos, flags, + start_gtids, n_start_gtids); thd->unregister_slave(); // todo: can be extraneous /* fake COM_QUIT -- if we get here, the thread needs to terminate */ error = TRUE; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 7bcff12a735..00fc65cf82f 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -163,6 +163,8 @@ struct binlog_send_info { bool should_stop; size_t dirlen; + Gtid_event_filter *gtid_event_filter; + binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn) : thd(thd_arg), net(&thd_arg->net), packet(packet_arg), @@ -185,6 +187,8 @@ struct binlog_send_info { error_text[0] = 0; bzero(&error_gtid, sizeof(error_gtid)); until_binlog_state.init(); + + gtid_event_filter= NULL; } }; @@ -1751,6 +1755,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, } } + /* Skip GTID event groups until we reach slave position within a domain_id. */ if (event_type == GTID_EVENT && info->using_gtid_state) { @@ -1758,7 +1763,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, slave_connection_state::entry *gtid_entry; rpl_gtid *gtid; - if (gtid_state->count() > 0 || until_gtid_state) + if (gtid_state->count() > 0 || until_gtid_state || info->gtid_event_filter) { rpl_gtid event_gtid; @@ -1899,6 +1904,17 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, } } } + + /* + Should this result be excluded from the output? + */ + if (info->gtid_event_filter && + info->gtid_event_filter->exclude(&event_gtid)) + { + info->gtid_skip_group= + (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE + : GTID_SKIP_TRANSACTION); + } } } @@ -2110,7 +2126,9 @@ err: static int init_binlog_sender(binlog_send_info *info, LOG_INFO *linfo, const char *log_ident, - my_off_t *pos) + my_off_t *pos, + rpl_gtid *start_gtids, + size_t n_start_gtids) { THD *thd= info->thd; int error; @@ -2130,7 +2148,8 @@ static int init_binlog_sender(binlog_send_info *info, info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd); info->mariadb_slave_capability= get_mariadb_slave_capability(thd); - info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); + info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state) || + start_gtids != NULL; DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info->using_gtid_state= false;); @@ -2247,6 +2266,17 @@ static int init_binlog_sender(binlog_send_info *info, info->clear_initial_log_pos= true; } + if (start_gtids != NULL) + { + Domain_gtid_event_filter *filter= new Domain_gtid_event_filter(); + my_off_t i; + for(i = 0; i < n_start_gtids; i++) + { + filter->add_start_gtid(&start_gtids[i]); + } + info->gtid_event_filter= filter; + } + return 0; } @@ -2840,7 +2870,8 @@ static int send_one_binlog_file(binlog_send_info *info, } void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, - ushort flags) + ushort flags, rpl_gtid *start_gtids, + uint32 n_start_gtids) { LOG_INFO linfo; @@ -2860,7 +2891,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, bzero((char*) &log,sizeof(log)); - if (init_binlog_sender(info, &linfo, log_ident, &pos)) + if (init_binlog_sender(info, &linfo, log_ident, &pos, start_gtids, + n_start_gtids)) goto err; has_transmit_started= true; @@ -3022,6 +3054,8 @@ err: thd->reset_current_linfo(); thd->variables.max_allowed_packet= old_max_allowed_packet; delete info->fdev; + delete info->gtid_event_filter; + my_free(start_gtids); if (likely(info->error == 0)) { diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 95916e31abf..bfc35ea5456 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -57,7 +57,8 @@ struct LOAD_FILE_IO_CACHE : public IO_CACHE int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count); int init_replication_sys_vars(); -void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags); +void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags, + rpl_gtid *start_gtids, uint32 n_start_gtids); #ifdef HAVE_PSI_INTERFACE extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state; |