summaryrefslogtreecommitdiff
path: root/client/mysqlbinlog.cc
diff options
context:
space:
mode:
authorBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-08-11 11:29:37 -0600
committerBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-08-23 13:57:38 -0600
commitc622af26fca37fc7864e9526a60dbe059644d120 (patch)
treed9ed6be82f0da06e8b2ac68abea5988180289cd0 /client/mysqlbinlog.cc
parent64f7dffcc7e0e69c31d9a36c2090a26300e57c4c (diff)
downloadmariadb-git-10.7-MDEV-4989.tar.gz
MDEV-4989: Support for GTID in mysqlbinlog10.7-MDEV-4989
New Feature: ============ This commit extends the mariadb-binlog capabilities to allow events to be filtered by GTID ranges. More specifically, the following capabilities are addressed: 1) GTIDs can be used to filter results on local binlog files 2) GTIDs can be used to filter results from remote servers 3) For a given GTID range, its start-position is exclusive and its stop-position is inclusive 4) After the events have been written, the session server id and domain id are reset to their former values 5) Output filtered by GTID ranges can be piped to the MariaDB client To facilitate these features, the --start-position and --stop-position arguments have been extended to additionally accept values formatted as a list of GTID positions, e.g. `--start-position=0-1-0,1-2-55` Reviewed By: ============ <TODO>
Diffstat (limited to 'client/mysqlbinlog.cc')
-rw-r--r--client/mysqlbinlog.cc331
1 files changed, 306 insertions, 25 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index d65828ea71c..dcb362e6b3a 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -143,10 +143,18 @@ static char *charset= 0;
static uint verbose= 0;
-static ulonglong start_position, stop_position;
+static char *start_pos_str, *stop_pos_str;
+static ulonglong start_position= BIN_LOG_HEADER_SIZE,
+ stop_position= (longlong)(~(my_off_t)0) ;
#define start_position_mot ((my_off_t)start_position)
#define stop_position_mot ((my_off_t)stop_position)
+static Delegating_gtid_event_filter *gtid_event_filter= NULL;
+static rpl_gtid *start_gtids, *stop_gtids;
+static my_bool is_event_gtid_active= FALSE;
+static uint32 n_start_gtid_ranges= 0;
+static uint32 n_stop_gtid_ranges= 0;
+
static char *start_datetime_str, *stop_datetime_str;
static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX;
static ulonglong rec_count= 0;
@@ -981,6 +989,40 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
return result;
}
+static inline my_bool is_gtid_filtering_enabled()
+{
+ return gtid_event_filter != NULL;
+}
+
+/*
+ Where the binlog is processed sequentially, the variable is_event_gtid_active
+ keeps track of the state of active event groups. When a new Gtid_log_event is
+ read, if it should be output, this function will return true. Otherwise, this
+ function will return false.
+*/
+static inline my_bool is_event_group_active()
+{
+ return is_event_gtid_active;
+}
+
+/*
+ When a Gtid_log_event marks a GTID that should be output, this function is
+ invoked to change the program state to start writing binlog events until
+ the event group has ended.
+*/
+static inline void activate_current_event_group()
+{
+ is_event_gtid_active= TRUE;
+}
+
+/*
+ When an active event group has written its last event, this function is
+ invoked to change the program state to stop writing events.
+*/
+static inline void deactivate_current_event_group()
+{
+ is_event_gtid_active= FALSE;
+}
/**
Print the given event, and either delete it or delegate the deletion
@@ -1019,11 +1061,34 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
#endif
/*
+ If the binlog output should be filtered using GTIDs, test the new event
+ group to see if its events should be written or ignored.
+ */
+ if (ev_type == GTID_EVENT && is_gtid_filtering_enabled())
+ {
+ Gtid_log_event *gle= (Gtid_log_event*) ev;
+ rpl_gtid gtid;
+ gtid.domain_id= gle->domain_id;
+ gtid.server_id= gle->server_id;
+ gtid.seq_no= gle->seq_no;
+ if (!gtid_event_filter->exclude(&gtid))
+ {
+ activate_current_event_group();
+ }
+ else
+ {
+ deactivate_current_event_group();
+ }
+
+ }
+
+ /*
Format events are not concerned by --offset and such, we always need to
read them to be able to process the wanted events.
*/
if (((rec_count >= offset) &&
- (ev->when >= start_datetime)) ||
+ (ev->when >= start_datetime) &&
+ (!is_gtid_filtering_enabled() || is_event_group_active())) ||
(ev_type == FORMAT_DESCRIPTION_EVENT))
{
if (ev_type != FORMAT_DESCRIPTION_EVENT)
@@ -1500,6 +1565,13 @@ end:
}
}
+ /* Xid_log_events or Query_log_events mark the end of a GTID event group. */
+ if ((ev_type == XID_EVENT || ev_type == QUERY_EVENT) &&
+ is_event_group_active())
+ {
+ deactivate_current_event_group();
+ }
+
if (destroy_evt) /* destroy it later if not set (ignored table map) */
delete ev;
}
@@ -1658,15 +1730,13 @@ static struct my_option my_options[] =
&start_datetime_str, &start_datetime_str,
0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"start-position", 'j',
- "Start reading the binlog at position N. Applies to the first binlog "
- "passed on the command line.",
- &start_position, &start_position, 0, GET_ULL,
- REQUIRED_ARG, BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE,
- /*
- COM_BINLOG_DUMP accepts only 4 bytes for the position
- so remote log reading has lower limit.
- */
- (ulonglong)(0xffffffffffffffffULL), 0, 0, 0},
+ "Start reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID. When using a positive integer, the value only "
+ "applies to the first binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id.",
+ &start_pos_str, &start_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0,
+ 0, 0, 0},
{"stop-datetime", OPT_STOP_DATETIME,
"Stop reading the binlog at first event having a datetime equal or "
"posterior to the argument; the argument must be a date and time "
@@ -1684,11 +1754,13 @@ static struct my_option my_options[] =
&opt_stop_never_slave_server_id, &opt_stop_never_slave_server_id, 0,
GET_ULONG, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"stop-position", OPT_STOP_POSITION,
- "Stop reading the binlog at position N. Applies to the last binlog "
- "passed on the command line.",
- &stop_position, &stop_position, 0, GET_ULL,
- REQUIRED_ARG, (longlong)(~(my_off_t)0), BIN_LOG_HEADER_SIZE,
- (ulonglong)(~(my_off_t)0), 0, 0, 0},
+ "Stop reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID. When using a positive integer, the value only "
+ "applies to the last binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id.",
+ &stop_pos_str, &stop_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0,
+ 0, 0},
{"table", 'T', "List entries for just this table (local log only).",
&table, &table, 0, GET_STR_ALLOC, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
@@ -1824,8 +1896,14 @@ static void cleanup()
my_free(const_cast<char*>(dirname_for_local_load));
my_free(start_datetime_str);
my_free(stop_datetime_str);
+ my_free(start_pos_str);
+ my_free(stop_pos_str);
+ my_free(start_gtids);
+ my_free(stop_gtids);
free_root(&glob_root, MYF(0));
+ delete gtid_event_filter;
+
delete binlog_filter;
delete glob_description_event;
if (mysql)
@@ -2075,6 +2153,123 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi
print_version();
opt_version= 1;
break;
+ case OPT_STOP_POSITION:
+ {
+ stop_gtids= gtid_parse_string_to_list(stop_pos_str, strlen(stop_pos_str),
+ &n_stop_gtid_ranges);
+ if (stop_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_STOP_POSITION specification. Treat the value
+ as a singular index.
+ */
+ stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Stop position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ }
+ else if (n_stop_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+ Domain_gtid_event_filter *domain_filter;
+
+ if (gtid_event_filter == NULL)
+ {
+ domain_filter= new Domain_gtid_event_filter();
+ gtid_event_filter= domain_filter;
+ }
+ else
+ {
+ domain_filter= (Domain_gtid_event_filter *) gtid_event_filter;
+ }
+
+ for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *stop_gtid= &stop_gtids[gtid_idx];
+ if (!domain_filter->add_stop_gtid(stop_gtid))
+ {
+ sql_print_error("Cannot add stop position; domain id %u "
+ "already has a stop position",
+ stop_gtid->domain_id);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ // Can't parse the position from the user
+ sql_print_error("Stop position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ break;
+ }
+ case 'j':
+ {
+ start_gtids= gtid_parse_string_to_list(
+ start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges);
+
+ if (start_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_START_POSITION specification. Treat the value
+ as a singular index.
+ */
+ start_position= my_strtoll10(start_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Start position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ }
+ else if (n_start_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+ Domain_gtid_event_filter *domain_filter;
+
+ if (gtid_event_filter == NULL)
+ {
+ domain_filter= new Domain_gtid_event_filter();
+ gtid_event_filter= domain_filter;
+ }
+ else
+ {
+ domain_filter= (Domain_gtid_event_filter *) gtid_event_filter;
+ }
+
+ for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *start_gtid= &start_gtids[gtid_idx];
+ if (!domain_filter->add_start_gtid(start_gtid))
+ {
+ sql_print_error("Cannot add start position; domain id %u "
+ "already has a start position",
+ start_gtid->domain_id);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ // Can't parse the position from the user
+ sql_print_error("Start position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ break;
+ }
case '?':
usage();
opt_version= 1;
@@ -2237,11 +2432,12 @@ static Exit_status dump_log_entries(const char* logname)
@retval ERROR_STOP An error occurred - the program should terminate.
@retval OK_CONTINUE No error, the program should continue.
*/
-static Exit_status check_master_version()
+static Exit_status check_master_version(uint *major, uint *minor, uint *patch)
{
MYSQL_RES* res = 0;
MYSQL_ROW row;
uint version;
+ size_t version_iter;
if (mysql_query(mysql, "SELECT VERSION()") ||
!(res = mysql_store_result(mysql)))
@@ -2257,12 +2453,52 @@ static Exit_status check_master_version()
goto err;
}
- if (!(version = atoi(row[0])))
+ if (!(*major = atoi(row[0])))
{
error("Could not find server version: "
"Master reported NULL for the version.");
goto err;
}
+
+ /* Try to save the minor version, if supplied with a storage location */
+ if (minor != NULL)
+ {
+ for (version_iter= 0; version_iter < strlen((const char *) row);
+ version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (!(*minor= atoi(&row[0][version_iter])))
+ {
+ error("Could not find server minor version: "
+ "Master reported NULL for the minor version.");
+ }
+ }
+
+ /* Try to save the patch version, if supplied with a storage location */
+ if (patch != NULL)
+ {
+ for (; version_iter < strlen((const char *) row); version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (!(*patch= atoi(&row[0][version_iter])))
+ {
+ error("Could not find server patch version: "
+ "Master reported NULL for the patch version.");
+ }
+ }
+
+ version= *major;
+
/*
Make a notice to the server that this client
is checksum-aware. It does not need the first fake Rotate
@@ -2606,21 +2842,56 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
DBUG_RETURN(retval);
net= &mysql->net;
- if ((retval= check_master_version()) != OK_CONTINUE)
+ uint major_version, minor_version;
+ if ((retval= check_master_version(&major_version, &minor_version, NULL)) !=
+ OK_CONTINUE)
DBUG_RETURN(retval);
/*
COM_BINLOG_DUMP accepts only 4 bytes for the position, so we are forced to
cast to uint32.
*/
- DBUG_ASSERT(start_position <= UINT_MAX32);
- int4store(buf, (uint32)start_position);
+ size_t buf_idx= 0;
+ if (is_gtid_filtering_enabled())
+ {
+ if (major_version < 10 || (major_version == 10 && minor_version < 7))
+ {
+ error("Master does not support GTID filtering. This feature was added "
+ "in MariaDB version 10.7");
+ DBUG_RETURN(ERROR_STOP);
+ }
+
+ size_t i;
+ for (i = 0; i < n_start_gtid_ranges; i++)
+ {
+ if (i > 0)
+ {
+ buf[buf_idx]= ',';
+ buf_idx += 1;
+ }
+
+
+ int4store(buf + buf_idx, start_gtids[i].domain_id);
+ buf[buf_idx+4]= '-';
+ int4store(buf + buf_idx + 5, start_gtids[i].server_id);
+ buf[buf_idx+9]= '-';
+ int8store(buf + buf_idx + 10, start_gtids[i].seq_no);
+ buf_idx += 18;
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(start_position <= UINT_MAX32);
+ int4store(buf, (uint32) start_position);
+ buf_idx= BIN_LOG_HEADER_SIZE;
+ }
if (!opt_skip_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
if (!opt_stop_never)
binlog_flags|= BINLOG_DUMP_NON_BLOCK;
- int2store(buf + BIN_LOG_HEADER_SIZE, binlog_flags);
+ int2store(buf + buf_idx, binlog_flags);
+ buf_idx += 2;
size_t tlen = strlen(logname);
if (tlen > sizeof(buf) - 10)
@@ -2637,9 +2908,10 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
}
else
slave_id= 0;
- int4store(buf + 6, slave_id);
- memcpy(buf + 10, logname, logname_len);
- if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + 10, 1))
+ int4store(buf + buf_idx, slave_id);
+ buf_idx += 4;
+ memcpy(buf + buf_idx, logname, logname_len);
+ if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + buf_idx, 1))
{
error("Got fatal error sending the log dump command.");
DBUG_RETURN(ERROR_STOP);
@@ -3210,6 +3482,14 @@ int main(int argc, char** argv)
"/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n");
fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n");
+
+ if (is_gtid_filtering_enabled())
+ {
+ fprintf(result_file,
+ "/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n"
+ "/*!100001 SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID "
+ "*/;\n");
+ }
}
if (tmpdir.list)
@@ -3271,3 +3551,4 @@ struct encryption_service_st encryption_handler=
#include "sql_list.cc"
#include "rpl_filter.cc"
#include "compat56.cc"
+#include "rpl_gtid.cc" \ No newline at end of file