diff options
Diffstat (limited to 'sql/rpl_gtid.h')
-rw-r--r-- | sql/rpl_gtid.h | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 11541c8000c..8b8c3bb9c92 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -28,6 +28,7 @@ extern const LEX_CSTRING rpl_gtid_slave_state_table_name; class String; #define GTID_MAX_STR_LENGTH (10+1+10+1+20) +#define PARAM_GTID(G) G.domain_id, G.server_id, G.seq_no struct rpl_gtid { @@ -36,6 +37,9 @@ struct rpl_gtid uint64 seq_no; }; +/* Data structure to help with quick lookup for filters. */ +typedef decltype(rpl_gtid::domain_id) gtid_filter_identifier; + inline bool operator==(const rpl_gtid& lhs, const rpl_gtid& rhs) { return @@ -380,5 +384,449 @@ extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, extern int gtid_check_rpl_slave_state_table(TABLE *table); extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len, uint32 *out_len); +extern rpl_gtid *gtid_unpack_string_to_list(const char *p, size_t len, + uint32 *out_len); + + + +/* + This class ensures that the GTID state of an event stream is consistent with + the set of provided binary log files. In particular, it has two concerns: + + 1) Ensuring that GTID events are monotonically increasing within each + domain + 2) Ensuring that the GTID state of the specified binary logs is consistent + both with the initial state that a user provides, and between + binary logs (if multiple are specified) +*/ +class Binlog_gtid_state_validator +{ +public: + + struct audit_elem + { + uint32 domain_id; + + + /* + Holds the largest GTID received, and is indexed by domain_id + */ + rpl_gtid last_gtid; + + /* + Holds the largest GTID received, and is indexed by domain_id + */ + rpl_gtid start_gtid; + + /* + List of the problematic GTIDs received which were out of order + */ + DYNAMIC_ARRAY late_gtids_real; + + /* + For each problematic GTID in late_gtids_real, this list contains the last + GTID of the domain at the time of receiving the out of order GTID. + */ + DYNAMIC_ARRAY late_gtids_previous; + }; + + Binlog_gtid_state_validator(); + ~Binlog_gtid_state_validator(); + + /* + Initialize where we should start monitoring for invalid GTID entries + in the event stream. Note that these start positions must occur at or after + a given binary logs GTID state (from Gtid_list_log_event) + */ + void initialize_start_gtids(rpl_gtid *start_gtids, size_t n_gtids); + + /* + Initialize our current state so we know where to expect GTIDs to start + increasing from. Error if the state exists after our expected start_gtid + positions, because we know we will be missing event data (possibly from + a purged log). + */ + my_bool initialize_gtid_state(FILE *out, rpl_gtid *gtids, size_t n_gtids); + + /* + Ensures that the expected stop GTID positions exist within the specified + binary logs. + */ + my_bool verify_stop_state(FILE *out, rpl_gtid *stop_gtids, size_t n_stop_gtids); + + /* + Ensure a GTID state (e.g., from a Gtid_list_log_event) is consistent with + the current state of our auditing. For example, if we see a GTID from a + Gtid_list_log_event that is ahead of our current state for that domain, we + have missed events (perhaps from a missing log). + */ + my_bool verify_gtid_state(FILE *out, rpl_gtid *gtid_state_cur); + + /* + Take note of a new GTID being processed. + + returns TRUE if the GTID is invalid, FALSE on success + */ + my_bool record(rpl_gtid *gtid); + + /* + Writes warnings/errors (if any) during GTID processing + + Returns TRUE if any findings were reported, FALSE otherwise + */ + my_bool report(FILE *out, my_bool is_strict_mode); + + static void report_details(FILE *out, const char *format, va_list args) + { + vfprintf(out, format, args); + fprintf(out, "\n"); + } + + static void warn(FILE *out, const char *format,...) + { + va_list args; + va_start(args, format); + fprintf(out, "WARNING: "); + report_details(out, format, args); + } + + static void error(FILE *out, const char *format,...) + { + va_list args; + va_start(args, format); + fprintf(out, "ERROR: "); + report_details(out, format, args); + } + +private: + + /* + Holds the records for each domain id we are monitoring. Elements are of type + `struct audit_elem` and indexed by domian_id. + */ + HASH m_audit_elem_domain_lookup; +}; + +/* + Interface to support different methods of filtering log events by GTID +*/ +class Gtid_event_filter +{ +public: + Gtid_event_filter() {}; + virtual ~Gtid_event_filter() {}; + + enum gtid_event_filter_type + { + DELEGATING_GTID_FILTER_TYPE = 1, + WINDOW_GTID_FILTER_TYPE = 2, + ACCEPT_ALL_GTID_FILTER_TYPE = 3, + REJECT_ALL_GTID_FILTER_TYPE = 4 + }; + + /* + Run the filter on an input gtid to test if the corresponding log events + should be excluded from a result + + Returns TRUE when the event group corresponding to the input GTID should be + excluded. + Returns FALSE when the event group should be included. + */ + virtual my_bool exclude(rpl_gtid *) = 0; + + /* + The gtid_event_filter_type that corresponds to the underlying filter + implementation + */ + virtual uint32 get_filter_type() = 0; + + /* + For filters that can maintain their own state, this tests if the filter + implementation has completed. + + Returns TRUE when completed, and FALSE when the filter has not finished. + */ + virtual my_bool has_finished() = 0; +}; + +/* + Filter implementation which will include any and all input GTIDs. This is + used to set default behavior for GTIDs that do not have explicit filters + set on their domain_id, e.g. when a Window_gtid_event_filter is used for + a specific domain, then all other domain_ids will be accepted using this + filter implementation. +*/ +class Accept_all_gtid_filter : public Gtid_event_filter +{ +public: + Accept_all_gtid_filter() {} + ~Accept_all_gtid_filter() {} + my_bool exclude(rpl_gtid *gtid) { return FALSE; } + uint32 get_filter_type() { return ACCEPT_ALL_GTID_FILTER_TYPE; } + my_bool has_finished() { return FALSE; } +}; + +/* + Filter implementation to exclude all tested GTIDs. +*/ +class Reject_all_gtid_filter : public Gtid_event_filter +{ +public: + Reject_all_gtid_filter() {} + ~Reject_all_gtid_filter() {} + my_bool exclude(rpl_gtid *gtid) { return TRUE; } + uint32 get_filter_type() { return REJECT_ALL_GTID_FILTER_TYPE; } + my_bool has_finished() { return FALSE; } +}; + +/* + A filter implementation that includes events that exist between two GTID + positions, m_start (exclusive) and m_stop (inclusive), within a domain. + + This filter is stateful, such that it expects GTIDs to be an increasing + stream, and internally, the window will activate and deactivate when the start + and stop positions of the event stream have passed through, respectively. +*/ +class Window_gtid_event_filter : public Gtid_event_filter +{ +public: + Window_gtid_event_filter(); + ~Window_gtid_event_filter() {} + + my_bool exclude(rpl_gtid*); + my_bool has_finished(); + + /* + Set the GTID that begins this window (exclusive) + + Returns 0 on ok, non-zero on error + */ + int set_start_gtid(rpl_gtid *start); + + /* + Set the GTID that ends this window (inclusive) + + Returns 0 on ok, non-zero on error + */ + int set_stop_gtid(rpl_gtid *stop); + + uint32 get_filter_type() { return WINDOW_GTID_FILTER_TYPE; } + + /* + Validates the underlying range is correct, and writes an error if not, i.e. + m_start >= m_stop. + + Returns FALSE on ok, TRUE if range is invalid + */ + my_bool is_range_invalid(); + + /* + Getter/setter methods + */ + my_bool has_start() { return m_has_start; } + my_bool has_stop() { return m_has_stop; } + rpl_gtid get_start_gtid() { return m_start; } + rpl_gtid get_stop_gtid() { return m_stop; } + + void clear_start_pos() + { + m_has_start= FALSE; + m_start= {0, 0, 0}; + } + + void clear_stop_pos() + { + m_has_stop= FALSE; + m_stop= {0, 0, 0}; + } + +protected: + + /* + When processing GTID streams, the order in which they are processed should + be sequential with no gaps between events. If a gap is found within a + window, warn the user. + */ + void verify_gtid_is_expected(rpl_gtid *gtid); + +private: + + enum warning_flags + { + WARN_GTID_SEQUENCE_NUMBER_OUT_OF_ORDER= 0x1 + }; + + /* + m_has_start : Indicates if a start to this window has been explicitly + provided. A window starts immediately if not provided. + */ + my_bool m_has_start; + + /* + m_has_stop : Indicates if a stop to this window has been explicitly + provided. A window continues indefinitely if not provided. + */ + my_bool m_has_stop; + + /* + m_is_active : Indicates whether or not the program is currently reading + events from within this window. When TRUE, events with + different server ids than those specified by m_start or + m_stop will be passed through. + */ + my_bool m_is_active; + + /* + m_has_passed : Indicates whether or not the program is currently reading + events from within this window. + */ + my_bool m_has_passed; + + /* m_start : marks the GTID that begins the window (exclusive). */ + rpl_gtid m_start; + + /* m_stop : marks the GTID that ends the range (inclusive). */ + rpl_gtid m_stop; +}; + +typedef struct _gtid_filter_element +{ + Gtid_event_filter *filter; + gtid_filter_identifier identifier; /* Used for HASH lookup */ +} gtid_filter_element; + +/* + Gtid_event_filter subclass which has no specific implementation, but rather + delegates the filtering to specific identifiable/mapped implementations. + + A default filter is used for GTIDs that are passed through which no explicit + filter can be identified. + + This class should be subclassed, where the get_id_from_gtid function + specifies how to extract the filter identifier from a GTID. +*/ +class Id_delegating_gtid_event_filter : public Gtid_event_filter +{ +public: + Id_delegating_gtid_event_filter(); + ~Id_delegating_gtid_event_filter(); + + my_bool exclude(rpl_gtid *gtid); + my_bool has_finished(); + void set_default_filter(Gtid_event_filter *default_filter); + + uint32 get_filter_type() { return DELEGATING_GTID_FILTER_TYPE; } + + virtual gtid_filter_identifier get_id_from_gtid(rpl_gtid *) = 0; + +protected: + + uint32 m_num_stateful_filters; + uint32 m_num_completed_filters; + Gtid_event_filter *m_default_filter; + + HASH m_filters_by_id_hash; + + gtid_filter_element *find_or_create_filter_element_for_id(gtid_filter_identifier); +}; + +/* + A subclass of Id_delegating_gtid_event_filter which identifies filters using the + domain id of a GTID. + + Additional helper functions include: + add_start_gtid(GTID) : adds a start GTID position to this filter, to be + identified by its domain id + add_stop_gtid(GTID) : adds a stop GTID position to this filter, to be + identified by its domain id + clear_start_gtids() : removes existing GTID start positions + clear_stop_gtids() : removes existing GTID stop positions + get_start_gtids() : gets all added GTID start positions + get_stop_gtids() : gets all added GTID stop positions + get_num_start_gtids() : gets the count of added GTID start positions + get_num_stop_gtids() : gets the count of added GTID stop positions +*/ +class Domain_gtid_event_filter : public Id_delegating_gtid_event_filter +{ +public: + Domain_gtid_event_filter() + { + my_init_dynamic_array(PSI_INSTRUMENT_ME, &m_start_filters, + sizeof(gtid_filter_element*), 8, 8, MYF(0)); + my_init_dynamic_array(PSI_INSTRUMENT_ME, &m_stop_filters, + sizeof(gtid_filter_element*), 8, 8, MYF(0)); + } + ~Domain_gtid_event_filter() + { + delete_dynamic(&m_start_filters); + delete_dynamic(&m_stop_filters); + } + + /* + Returns the domain id of from the input GTID + */ + gtid_filter_identifier get_id_from_gtid(rpl_gtid *gtid) + { + return gtid->domain_id; + } + + /* + Override Id_delegating_gtid_event_filter to extend with domain specific + filtering logic + */ + my_bool exclude(rpl_gtid*); + + /* + Validates that window filters with both a start and stop GTID satisfy + stop_gtid > start_gtid + + Returns 0 on ok, non-zero if any windows are invalid. + */ + int validate_window_filters(); + + /* + Helper function to start a GTID window filter at the given GTID + + Returns 0 on ok, non-zero on error + */ + int add_start_gtid(rpl_gtid *gtid); + + /* + Helper function to end a GTID window filter at the given GTID + + Returns 0 on ok, non-zero on error + */ + int add_stop_gtid(rpl_gtid *gtid); + + /* + If start or stop position is respecified, we remove all existing values + and start over with the new specification. + */ + void clear_start_gtids(); + void clear_stop_gtids(); + + /* + Return list of all GTIDs used as start position. + + Note that this list is allocated and it is up to the user to free it + */ + rpl_gtid *get_start_gtids(); + + /* + Return list of all GTIDs used as stop position. + + Note that this list is allocated and it is up to the user to free it + */ + rpl_gtid *get_stop_gtids(); + + size_t get_num_start_gtids() { return m_start_filters.elements; } + size_t get_num_stop_gtids() { return m_stop_filters.elements; } + +private: + DYNAMIC_ARRAY m_start_filters; + DYNAMIC_ARRAY m_stop_filters; + + Window_gtid_event_filter *find_or_create_window_filter_for_id(gtid_filter_identifier); +}; #endif /* RPL_GTID_H */ |