summaryrefslogtreecommitdiff
path: root/client/mysqlbinlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'client/mysqlbinlog.cc')
-rw-r--r--client/mysqlbinlog.cc331
1 files changed, 306 insertions, 25 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index d65828ea71c..dcb362e6b3a 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -143,10 +143,18 @@ static char *charset= 0;
static uint verbose= 0;
-static ulonglong start_position, stop_position;
+static char *start_pos_str, *stop_pos_str;
+static ulonglong start_position= BIN_LOG_HEADER_SIZE,
+ stop_position= (longlong)(~(my_off_t)0) ;
#define start_position_mot ((my_off_t)start_position)
#define stop_position_mot ((my_off_t)stop_position)
+static Delegating_gtid_event_filter *gtid_event_filter= NULL;
+static rpl_gtid *start_gtids, *stop_gtids;
+static my_bool is_event_gtid_active= FALSE;
+static uint32 n_start_gtid_ranges= 0;
+static uint32 n_stop_gtid_ranges= 0;
+
static char *start_datetime_str, *stop_datetime_str;
static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX;
static ulonglong rec_count= 0;
@@ -981,6 +989,40 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
return result;
}
+static inline my_bool is_gtid_filtering_enabled()
+{
+ return gtid_event_filter != NULL;
+}
+
+/*
+ Where the binlog is processed sequentially, the variable is_event_gtid_active
+ keeps track of the state of active event groups. When a new Gtid_log_event is
+ read, if it should be output, this function will return true. Otherwise, this
+ function will return false.
+*/
+static inline my_bool is_event_group_active()
+{
+ return is_event_gtid_active;
+}
+
+/*
+ When a Gtid_log_event marks a GTID that should be output, this function is
+ invoked to change the program state to start writing binlog events until
+ the event group has ended.
+*/
+static inline void activate_current_event_group()
+{
+ is_event_gtid_active= TRUE;
+}
+
+/*
+ When an active event group has written its last event, this function is
+ invoked to change the program state to stop writing events.
+*/
+static inline void deactivate_current_event_group()
+{
+ is_event_gtid_active= FALSE;
+}
/**
Print the given event, and either delete it or delegate the deletion
@@ -1019,11 +1061,34 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
#endif
/*
+ If the binlog output should be filtered using GTIDs, test the new event
+ group to see if its events should be written or ignored.
+ */
+ if (ev_type == GTID_EVENT && is_gtid_filtering_enabled())
+ {
+ Gtid_log_event *gle= (Gtid_log_event*) ev;
+ rpl_gtid gtid;
+ gtid.domain_id= gle->domain_id;
+ gtid.server_id= gle->server_id;
+ gtid.seq_no= gle->seq_no;
+ if (!gtid_event_filter->exclude(&gtid))
+ {
+ activate_current_event_group();
+ }
+ else
+ {
+ deactivate_current_event_group();
+ }
+
+ }
+
+ /*
Format events are not concerned by --offset and such, we always need to
read them to be able to process the wanted events.
*/
if (((rec_count >= offset) &&
- (ev->when >= start_datetime)) ||
+ (ev->when >= start_datetime) &&
+ (!is_gtid_filtering_enabled() || is_event_group_active())) ||
(ev_type == FORMAT_DESCRIPTION_EVENT))
{
if (ev_type != FORMAT_DESCRIPTION_EVENT)
@@ -1500,6 +1565,13 @@ end:
}
}
+ /* Xid_log_events or Query_log_events mark the end of a GTID event group. */
+ if ((ev_type == XID_EVENT || ev_type == QUERY_EVENT) &&
+ is_event_group_active())
+ {
+ deactivate_current_event_group();
+ }
+
if (destroy_evt) /* destroy it later if not set (ignored table map) */
delete ev;
}
@@ -1658,15 +1730,13 @@ static struct my_option my_options[] =
&start_datetime_str, &start_datetime_str,
0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"start-position", 'j',
- "Start reading the binlog at position N. Applies to the first binlog "
- "passed on the command line.",
- &start_position, &start_position, 0, GET_ULL,
- REQUIRED_ARG, BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE,
- /*
- COM_BINLOG_DUMP accepts only 4 bytes for the position
- so remote log reading has lower limit.
- */
- (ulonglong)(0xffffffffffffffffULL), 0, 0, 0},
+ "Start reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID. When using a positive integer, the value only "
+ "applies to the first binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id.",
+ &start_pos_str, &start_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0,
+ 0, 0, 0},
{"stop-datetime", OPT_STOP_DATETIME,
"Stop reading the binlog at first event having a datetime equal or "
"posterior to the argument; the argument must be a date and time "
@@ -1684,11 +1754,13 @@ static struct my_option my_options[] =
&opt_stop_never_slave_server_id, &opt_stop_never_slave_server_id, 0,
GET_ULONG, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"stop-position", OPT_STOP_POSITION,
- "Stop reading the binlog at position N. Applies to the last binlog "
- "passed on the command line.",
- &stop_position, &stop_position, 0, GET_ULL,
- REQUIRED_ARG, (longlong)(~(my_off_t)0), BIN_LOG_HEADER_SIZE,
- (ulonglong)(~(my_off_t)0), 0, 0, 0},
+ "Stop reading the binlog at this position. Type can either be a positive "
+ "integer or a GTID. When using a positive integer, the value only "
+ "applies to the last binlog passed on the command line. In GTID mode, "
+ "multiple GTIDs can be passed as a comma separated list, where each must "
+ "have a unique domain id.",
+ &stop_pos_str, &stop_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0,
+ 0, 0},
{"table", 'T', "List entries for just this table (local log only).",
&table, &table, 0, GET_STR_ALLOC, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
@@ -1824,8 +1896,14 @@ static void cleanup()
my_free(const_cast<char*>(dirname_for_local_load));
my_free(start_datetime_str);
my_free(stop_datetime_str);
+ my_free(start_pos_str);
+ my_free(stop_pos_str);
+ my_free(start_gtids);
+ my_free(stop_gtids);
free_root(&glob_root, MYF(0));
+ delete gtid_event_filter;
+
delete binlog_filter;
delete glob_description_event;
if (mysql)
@@ -2075,6 +2153,123 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi
print_version();
opt_version= 1;
break;
+ case OPT_STOP_POSITION:
+ {
+ stop_gtids= gtid_parse_string_to_list(stop_pos_str, strlen(stop_pos_str),
+ &n_stop_gtid_ranges);
+ if (stop_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_STOP_POSITION specification. Treat the value
+ as a singular index.
+ */
+ stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Stop position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ }
+ else if (n_stop_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+ Domain_gtid_event_filter *domain_filter;
+
+ if (gtid_event_filter == NULL)
+ {
+ domain_filter= new Domain_gtid_event_filter();
+ gtid_event_filter= domain_filter;
+ }
+ else
+ {
+ domain_filter= (Domain_gtid_event_filter *) gtid_event_filter;
+ }
+
+ for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *stop_gtid= &stop_gtids[gtid_idx];
+ if (!domain_filter->add_stop_gtid(stop_gtid))
+ {
+ sql_print_error("Cannot add stop position; domain id %u "
+ "already has a stop position",
+ stop_gtid->domain_id);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ // Can't parse the position from the user
+ sql_print_error("Stop position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ break;
+ }
+ case 'j':
+ {
+ start_gtids= gtid_parse_string_to_list(
+ start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges);
+
+ if (start_gtids == NULL)
+ {
+ int err= 0;
+ char *end_ptr= NULL;
+ /*
+ No GTIDs specified in OPT_START_POSITION specification. Treat the value
+ as a singular index.
+ */
+ start_position= my_strtoll10(start_pos_str, &end_ptr, &err);
+
+ if (err || *end_ptr)
+ {
+ // Can't parse the position from the user
+ sql_print_error("Start position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ }
+ else if (n_start_gtid_ranges > 0)
+ {
+ uint32 gtid_idx;
+ Domain_gtid_event_filter *domain_filter;
+
+ if (gtid_event_filter == NULL)
+ {
+ domain_filter= new Domain_gtid_event_filter();
+ gtid_event_filter= domain_filter;
+ }
+ else
+ {
+ domain_filter= (Domain_gtid_event_filter *) gtid_event_filter;
+ }
+
+ for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++)
+ {
+ rpl_gtid *start_gtid= &start_gtids[gtid_idx];
+ if (!domain_filter->add_start_gtid(start_gtid))
+ {
+ sql_print_error("Cannot add start position; domain id %u "
+ "already has a start position",
+ start_gtid->domain_id);
+ return 1;
+ }
+ }
+ }
+ else
+ {
+ // Can't parse the position from the user
+ sql_print_error("Start position argument value is invalid. Should be "
+ "either a position or GTID.");
+ return 1;
+ }
+ break;
+ }
case '?':
usage();
opt_version= 1;
@@ -2237,11 +2432,12 @@ static Exit_status dump_log_entries(const char* logname)
@retval ERROR_STOP An error occurred - the program should terminate.
@retval OK_CONTINUE No error, the program should continue.
*/
-static Exit_status check_master_version()
+static Exit_status check_master_version(uint *major, uint *minor, uint *patch)
{
MYSQL_RES* res = 0;
MYSQL_ROW row;
uint version;
+ size_t version_iter;
if (mysql_query(mysql, "SELECT VERSION()") ||
!(res = mysql_store_result(mysql)))
@@ -2257,12 +2453,52 @@ static Exit_status check_master_version()
goto err;
}
- if (!(version = atoi(row[0])))
+ if (!(*major = atoi(row[0])))
{
error("Could not find server version: "
"Master reported NULL for the version.");
goto err;
}
+
+ /* Try to save the minor version, if supplied with a storage location */
+ if (minor != NULL)
+ {
+ for (version_iter= 0; version_iter < strlen((const char *) row);
+ version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (!(*minor= atoi(&row[0][version_iter])))
+ {
+ error("Could not find server minor version: "
+ "Master reported NULL for the minor version.");
+ }
+ }
+
+ /* Try to save the patch version, if supplied with a storage location */
+ if (patch != NULL)
+ {
+ for (; version_iter < strlen((const char *) row); version_iter++)
+ {
+ if (row[0][version_iter] == '.')
+ {
+ version_iter= version_iter + 1;
+ break;
+ }
+ }
+ if (!(*patch= atoi(&row[0][version_iter])))
+ {
+ error("Could not find server patch version: "
+ "Master reported NULL for the patch version.");
+ }
+ }
+
+ version= *major;
+
/*
Make a notice to the server that this client
is checksum-aware. It does not need the first fake Rotate
@@ -2606,21 +2842,56 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
DBUG_RETURN(retval);
net= &mysql->net;
- if ((retval= check_master_version()) != OK_CONTINUE)
+ uint major_version, minor_version;
+ if ((retval= check_master_version(&major_version, &minor_version, NULL)) !=
+ OK_CONTINUE)
DBUG_RETURN(retval);
/*
COM_BINLOG_DUMP accepts only 4 bytes for the position, so we are forced to
cast to uint32.
*/
- DBUG_ASSERT(start_position <= UINT_MAX32);
- int4store(buf, (uint32)start_position);
+ size_t buf_idx= 0;
+ if (is_gtid_filtering_enabled())
+ {
+ if (major_version < 10 || (major_version == 10 && minor_version < 7))
+ {
+ error("Master does not support GTID filtering. This feature was added "
+ "in MariaDB version 10.7");
+ DBUG_RETURN(ERROR_STOP);
+ }
+
+ size_t i;
+ for (i = 0; i < n_start_gtid_ranges; i++)
+ {
+ if (i > 0)
+ {
+ buf[buf_idx]= ',';
+ buf_idx += 1;
+ }
+
+
+ int4store(buf + buf_idx, start_gtids[i].domain_id);
+ buf[buf_idx+4]= '-';
+ int4store(buf + buf_idx + 5, start_gtids[i].server_id);
+ buf[buf_idx+9]= '-';
+ int8store(buf + buf_idx + 10, start_gtids[i].seq_no);
+ buf_idx += 18;
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(start_position <= UINT_MAX32);
+ int4store(buf, (uint32) start_position);
+ buf_idx= BIN_LOG_HEADER_SIZE;
+ }
if (!opt_skip_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
if (!opt_stop_never)
binlog_flags|= BINLOG_DUMP_NON_BLOCK;
- int2store(buf + BIN_LOG_HEADER_SIZE, binlog_flags);
+ int2store(buf + buf_idx, binlog_flags);
+ buf_idx += 2;
size_t tlen = strlen(logname);
if (tlen > sizeof(buf) - 10)
@@ -2637,9 +2908,10 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
}
else
slave_id= 0;
- int4store(buf + 6, slave_id);
- memcpy(buf + 10, logname, logname_len);
- if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + 10, 1))
+ int4store(buf + buf_idx, slave_id);
+ buf_idx += 4;
+ memcpy(buf + buf_idx, logname, logname_len);
+ if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + buf_idx, 1))
{
error("Got fatal error sending the log dump command.");
DBUG_RETURN(ERROR_STOP);
@@ -3210,6 +3482,14 @@ int main(int argc, char** argv)
"/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n");
fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n");
+
+ if (is_gtid_filtering_enabled())
+ {
+ fprintf(result_file,
+ "/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n"
+ "/*!100001 SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID "
+ "*/;\n");
+ }
}
if (tmpdir.list)
@@ -3271,3 +3551,4 @@ struct encryption_service_st encryption_handler=
#include "sql_list.cc"
#include "rpl_filter.cc"
#include "compat56.cc"
+#include "rpl_gtid.cc" \ No newline at end of file