summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-03-09 10:27:38 +0100
committerunknown <knielsen@knielsen-hq.org>2014-03-09 10:27:38 +0100
commit2c2478b82260f5110ea2c5bed3c6c7bcd3558453 (patch)
tree24a7a88645c37c46f734869cb8b593ea6ea4dfec
parent5c31e79f8bba85e555dac2e2f6e97cc1b0a2b51b (diff)
downloadmariadb-git-2c2478b82260f5110ea2c5bed3c6c7bcd3558453.tar.gz
MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure
Before, the arrival of same GTID twice in multi-source replication would cause double-apply or in gtid strict mode an error. Keep the behaviour, but add an option --gtid-ignore-duplicates which allows to correctly handle duplicates, ignoring all but the first. This relies on the user ensuring correct configuration so that sequence numbers are strictly increasing within each replication domain; then duplicates can be detected simply by comparing the sequence numbers against what is already applied. Only one master connection (but possibly multiple parallel worker threads within that connection) is allowed to apply events within one replication domain at a time; any other connection that receives a GTID in the same domain either discards it (if it is already applied) or waits for the other connection to not have any events to apply. Intermediate patch, as proof-of-concept for testing. The main limitation is that currently it is only implemented for parallel replication, @@slave_parallel_threads > 0.
-rw-r--r--mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf24
-rw-r--r--mysql-test/suite/multi_source/gtid_ignore_duplicates.result188
-rw-r--r--mysql-test/suite/multi_source/gtid_ignore_duplicates.test208
-rw-r--r--sql/log_event.cc7
-rw-r--r--sql/mysqld.cc6
-rw-r--r--sql/mysqld.h3
-rw-r--r--sql/rpl_gtid.cc114
-rw-r--r--sql/rpl_gtid.h22
-rw-r--r--sql/rpl_parallel.cc24
-rw-r--r--sql/rpl_rli.cc3
-rw-r--r--sql/slave.cc33
-rw-r--r--sql/sql_repl.cc363
-rw-r--r--sql/sys_vars.cc48
13 files changed, 856 insertions, 187 deletions
diff --git a/mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf b/mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf
new file mode 100644
index 00000000000..b47ebb2cf30
--- /dev/null
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf
@@ -0,0 +1,24 @@
+!include my.cnf
+
+[mysqld.1]
+log-slave-updates
+loose-innodb
+
+[mysqld.2]
+log-slave-updates
+loose-innodb
+
+[mysqld.3]
+log-bin=server3-bin
+log-slave-updates
+loose-innodb
+
+[mysqld.4]
+server-id=4
+log-bin=server4-bin
+log-slave-updates
+loose-innodb
+
+[ENV]
+SERVER_MYPORT_4= @mysqld.4.port
+SERVER_MYSOCK_4= @mysqld.4.socket
diff --git a/mysql-test/suite/multi_source/gtid_ignore_duplicates.result b/mysql-test/suite/multi_source/gtid_ignore_duplicates.result
new file mode 100644
index 00000000000..bac522af76b
--- /dev/null
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.result
@@ -0,0 +1,188 @@
+*** Test all-to-all replication with --gtid-ignore-duplicates ***
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 1;
+SET SESSION gtid_domain_id= 1;
+CHANGE MASTER 'b2a' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+CHANGE MASTER 'c2a' TO master_port=MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'b2a';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = 'c2a';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 2;
+SET SESSION gtid_domain_id= 2;
+CHANGE MASTER 'a2b' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+CHANGE MASTER 'c2b' TO master_port=MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2b';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = 'c2b';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 3;
+SET SESSION gtid_domain_id= 3;
+CHANGE MASTER 'a2c' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+CHANGE MASTER 'b2c' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2c';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = 'b2c';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 1;
+SET SESSION gtid_domain_id= 1;
+CHANGE MASTER 'a2d' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2d';
+START SLAVE;
+include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+BEGIN;
+INSERT INTO t1 VALUES (2);
+INSERT INTO t1 VALUES (3);
+COMMIT;
+INSERT INTO t1 VALUES (4), (5);
+INSERT INTO t1 VALUES (6);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+6
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+6
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+6
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+3
+4
+5
+6
+INSERT INTO t1 VALUES (10);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+STOP SLAVE "c2b";
+SET default_master_connection = "c2b";
+include/wait_for_slave_to_stop.inc
+STOP SLAVE "a2b";
+SET default_master_connection = "a2b";
+include/wait_for_slave_to_stop.inc
+INSERT INTO t1 VALUES (11);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+11
+SET default_master_connection = "b2a";
+STOP SLAVE;
+include/wait_for_slave_to_stop.inc
+INSERT INTO t1 VALUES (12);
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+12
+include/save_master_gtid.inc
+START SLAVE "b2a";
+SET default_master_connection = "b2a";
+include/wait_for_slave_to_start.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+11
+12
+START SLAVE "c2b";
+SET default_master_connection = "c2b";
+include/wait_for_slave_to_start.inc
+START SLAVE "a2b";
+SET default_master_connection = "a2b";
+include/wait_for_slave_to_start.inc
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+a
+10
+11
+12
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+Warnings:
+Note 1938 SLAVE 'c2a' stopped
+Note 1938 SLAVE 'b2a' stopped
+include/reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+Warnings:
+Note 1938 SLAVE 'a2b' stopped
+Note 1938 SLAVE 'c2b' stopped
+include/reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+Warnings:
+Note 1938 SLAVE 'a2c' stopped
+Note 1938 SLAVE 'b2c' stopped
+include/reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+Warnings:
+Note 1938 SLAVE 'a2d' stopped
+include/reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
diff --git a/mysql-test/suite/multi_source/gtid_ignore_duplicates.test b/mysql-test/suite/multi_source/gtid_ignore_duplicates.test
new file mode 100644
index 00000000000..16db4d82ddd
--- /dev/null
+++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.test
@@ -0,0 +1,208 @@
+--source include/not_embedded.inc
+--source include/have_innodb.inc
+
+--echo *** Test all-to-all replication with --gtid-ignore-duplicates ***
+
+--connect (server_1,127.0.0.1,root,,,$SERVER_MYPORT_1)
+--connect (server_2,127.0.0.1,root,,,$SERVER_MYPORT_2)
+--connect (server_3,127.0.0.1,root,,,$SERVER_MYPORT_3)
+--connect (server_4,127.0.0.1,root,,,$SERVER_MYPORT_4)
+
+# Setup A <-> B, B <-> C, C <-> A, and A -> D.
+
+--connection server_1
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 1;
+SET SESSION gtid_domain_id= 1;
+--replace_result $SERVER_MYPORT_2 MYPORT_2
+eval CHANGE MASTER 'b2a' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+--replace_result $SERVER_MYPORT_3 MYPORT_3
+eval CHANGE MASTER 'c2a' TO master_port=$SERVER_MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'b2a';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = 'c2a';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+--connection server_2
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 2;
+SET SESSION gtid_domain_id= 2;
+--replace_result $SERVER_MYPORT_1 MYPORT_1
+eval CHANGE MASTER 'a2b' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+--replace_result $SERVER_MYPORT_3 MYPORT_3
+eval CHANGE MASTER 'c2b' TO master_port=$SERVER_MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2b';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = 'c2b';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+--connection server_3
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 3;
+SET SESSION gtid_domain_id= 3;
+--replace_result $SERVER_MYPORT_1 MYPORT_1
+eval CHANGE MASTER 'a2c' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+--replace_result $SERVER_MYPORT_2 MYPORT_2
+eval CHANGE MASTER 'b2c' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2c';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = 'b2c';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+--connection server_4
+SET @old_parallel= @@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates;
+SET GLOBAL gtid_ignore_duplicates=1;
+SET GLOBAL gtid_domain_id= 1;
+SET SESSION gtid_domain_id= 1;
+--replace_result $SERVER_MYPORT_1 MYPORT_1
+eval CHANGE MASTER 'a2d' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos;
+set default_master_connection = 'a2d';
+START SLAVE;
+--source include/wait_for_slave_to_start.inc
+set default_master_connection = '';
+
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+BEGIN;
+INSERT INTO t1 VALUES (2);
+INSERT INTO t1 VALUES (3);
+COMMIT;
+INSERT INTO t1 VALUES (4), (5);
+INSERT INTO t1 VALUES (6);
+
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_3
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_4
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_1
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+# Test that we can connect at a GTID position that has not yet reached
+# that master server.
+# We stop the connections C->B and A->B, create an event on C, Check that
+# the event has reached A (but not B). Then let A stop and re-connect to
+# B, which will connect at the new event, which is in the future for B.
+
+--connection server_3
+INSERT INTO t1 VALUES (10);
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+STOP SLAVE "c2b";
+SET default_master_connection = "c2b";
+--source include/wait_for_slave_to_stop.inc
+STOP SLAVE "a2b";
+SET default_master_connection = "a2b";
+--source include/wait_for_slave_to_stop.inc
+
+--connection server_3
+INSERT INTO t1 VALUES (11);
+--source include/save_master_gtid.inc
+
+--connection server_1
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+SET default_master_connection = "b2a";
+STOP SLAVE;
+--source include/wait_for_slave_to_stop.inc
+
+--connection server_2
+INSERT INTO t1 VALUES (12);
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+--source include/save_master_gtid.inc
+
+--connection server_1
+START SLAVE "b2a";
+SET default_master_connection = "b2a";
+--source include/wait_for_slave_to_start.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+
+--connection server_2
+START SLAVE "c2b";
+SET default_master_connection = "c2b";
+--source include/wait_for_slave_to_start.inc
+START SLAVE "a2b";
+SET default_master_connection = "a2b";
+--source include/wait_for_slave_to_start.inc
+
+--connection server_1
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 10 ORDER BY a;
+
+
+# Clean up.
+--connection server_1
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+--source reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+--disconnect server_1
+
+--connection server_2
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+--source reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+--disconnect server_2
+
+--connection server_3
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+--source reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+--disconnect server_3
+
+--connection server_4
+SET GLOBAL gtid_domain_id=0;
+STOP ALL SLAVES;
+--source reset_master_slave.inc
+SET GLOBAL slave_parallel_threads= @old_parallel;
+SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates;
+DROP TABLE t1;
+--disconnect server_4
diff --git a/sql/log_event.cc b/sql/log_event.cc
index f3f6d7a5d38..98524d73433 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'",
end:
if (sub_id && !thd->is_slave_error)
- rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
+ rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
@@ -6806,7 +6806,8 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
sub_id_list[i],
false, false)))
return ret;
- rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i]);
+ rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i],
+ NULL);
}
}
ret= Log_event::do_apply_event(rgi);
@@ -7326,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
thd->mdl_context.release_transactional_locks();
if (!res && sub_id)
- rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
+ rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid, rli);
/*
Increment the global status commit count variable
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 36d0edee660..d292cc86cfb 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -553,6 +553,7 @@ ulong opt_slave_domain_parallel_threads= 0;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
+my_bool opt_gtid_ignore_duplicates= FALSE;
const double log_10[] = {
1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009,
@@ -987,7 +988,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered;
-PSI_cond_key key_COND_wait_gtid;
+PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
static PSI_cond_info all_server_conds[]=
{
@@ -1035,7 +1036,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
- { &key_COND_wait_gtid, "COND_wait_gtid", 0}
+ { &key_COND_wait_gtid, "COND_wait_gtid", 0},
+ { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 0d4b23b12e7..28b9c061945 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -184,6 +184,7 @@ extern ulong opt_slave_domain_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
+extern my_bool opt_gtid_ignore_duplicates;
extern ulong back_log;
extern ulong executed_events;
extern char language[FN_REFLEN];
@@ -299,7 +300,7 @@ extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer;
-extern PSI_cond_key key_COND_wait_gtid;
+extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index e51dee20c19..b66651ae5fd 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -33,7 +33,8 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void
-rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
+rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
+ const Relay_log_info *rli)
{
int err;
/*
@@ -44,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
it is even committed.
*/
mysql_mutex_lock(&LOCK_slave_state);
- err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
+ err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli);
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
@@ -76,17 +77,102 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
rgi->gtid_sub_id= 0;
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
DBUG_RETURN(1);
- update_state_hash(sub_id, &rgi->current_gtid);
+ update_state_hash(sub_id, &rgi->current_gtid, rgi->rli);
}
DBUG_RETURN(0);
}
+/*
+ Check GTID event execution when --gtid-ignore-duplicates.
+
+ The idea with --gtid-ignore-duplicates is that we allow multiple master
+ connections (in multi-source replication) to all receive the same GTIDs and
+ event groups. Only one instance of each is applied; we use the sequence
+ number in the GTID to decide whether a GTID has already been applied.
+
+ So if the seq_no of a GTID (or a higher sequence number) has already been
+ applied, then the event should be skipped. If not then the event should be
+ applied.
+
+ To avoid two master connections tring to apply the same event
+ simultaneously, only one is allowed to work in any given domain at any point
+ in time. The associated Relay_log_info object is called the owner of the
+ domain (and there can be multiple parallel worker threads working in that
+ domain for that Relay_log_info). Any other Relay_log_info/master connection
+ must wait for the domain to become free, or for their GTID to have been
+ applied, before being allowed to proceed.
+
+ Returns:
+ 0 This GTID is already applied, it should be skipped.
+ 1 The GTID is not yet applied; this rli is now the owner, and must apply
+ the event and release the domain afterwards.
+ -1 Error (out of memory to allocate a new element for the domain).
+*/
+int
+rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
+{
+ uint32 domain_id= gtid->domain_id;
+ uint32 seq_no= gtid->seq_no;
+ rpl_slave_state::element *elem;
+ int res;
+
+ mysql_mutex_lock(&LOCK_slave_state);
+ if (!(elem= get_element(domain_id)))
+ {
+ res= -1;
+ goto err;
+ }
+ /*
+ Note that the elem pointer does not change once inserted in the hash. So
+ we can re-use the pointer without looking it up again in the hash after
+ each lock release and re-take.
+ */
+
+ /* ToDo: Make this wait killable. */
+ for (;;)
+ {
+ if (elem->highest_seq_no >= seq_no)
+ {
+ /* This sequence number is already applied, ignore it. */
+ res= 0;
+ break;
+ }
+ if (!elem->owner_rli)
+ {
+ /* The domain became free, grab it and apply the event. */
+ elem->owner_rli= rli;
+ elem->owner_count= 1;
+ res= 1;
+ break;
+ }
+ if (elem->owner_rli == rli)
+ {
+ /* Already own this domain, increment reference count and apply event. */
+ ++elem->owner_count;
+ res= 1;
+ break;
+ }
+ /*
+ Someone else is currently processing this GTID (or an earlier one).
+ Wait for them to complete (or fail), and then check again.
+ */
+ mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
+ &LOCK_slave_state);
+ }
+
+err:
+ mysql_mutex_unlock(&LOCK_slave_state);
+ return res;
+}
+
+
static void
rpl_slave_state_free_element(void *arg)
{
struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
mysql_cond_destroy(&elem->COND_wait_gtid);
+ mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates);
my_free(elem);
}
@@ -147,7 +233,7 @@ rpl_slave_state::deinit()
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no)
+ uint64 seq_no, const Relay_log_info *rli)
{
element *elem= NULL;
list_element *list_elem= NULL;
@@ -170,6 +256,20 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
mysql_cond_broadcast(&elem->COND_wait_gtid);
}
+ if (opt_gtid_ignore_duplicates && rli)
+ {
+ uint32 count= elem->owner_count;
+ DBUG_ASSERT(count > 0);
+ DBUG_ASSERT(elem->owner_rli == rli);
+ --count;
+ elem->owner_count= count;
+ if (count == 0)
+ {
+ elem->owner_rli= NULL;
+ mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+ }
+ }
+
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
@@ -199,7 +299,11 @@ rpl_slave_state::get_element(uint32 domain_id)
elem->domain_id= domain_id;
elem->highest_seq_no= 0;
elem->gtid_waiter= NULL;
+ elem->owner_rli= NULL;
+ elem->owner_count= 0;
mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
+ mysql_cond_init(key_COND_gtid_ignore_duplicates,
+ &elem->COND_gtid_ignore_duplicates, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
@@ -821,7 +925,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
record_gtid(thd, &gtid, sub_id, false, in_statement) ||
- update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
+ update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
return 1;
if (state_from_master == end)
break;
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 54f352661a7..aef1ca9e403 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -91,6 +91,8 @@ struct gtid_waiting {
};
+class Relay_log_info;
+
/*
Replication slave state.
@@ -131,6 +133,19 @@ struct rpl_slave_state
uint64 min_wait_seq_no;
mysql_cond_t COND_wait_gtid;
+ /*
+ For --gtid-ignore-duplicates. The Relay_log_info that currently owns
+ this domain, and the number of worker threads that are active in it.
+
+ The idea is that only one of multiple master connections is allowed to
+ actively apply events for a given domain. Other connections must either
+ discard the events (if the seq_no in GTID shows they have already been
+ applied), or wait to see if the current owner will apply it.
+ */
+ const Relay_log_info *owner_rli;
+ uint32 owner_count;
+ mysql_cond_t COND_gtid_ignore_duplicates;
+
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add(list_element *l)
{
@@ -155,7 +170,8 @@ struct rpl_slave_state
void deinit();
void truncate_hash();
ulong count() const { return hash.records; }
- int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
+ int update(uint32 domain_id, uint32 server_id, uint64 sub_id,
+ uint64 seq_no, const Relay_log_info *rli);
int truncate_state_table(THD *thd);
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction, bool in_statement);
@@ -171,8 +187,10 @@ struct rpl_slave_state
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
- void update_state_hash(uint64 sub_id, rpl_gtid *gtid);
+ void update_state_hash(uint64 sub_id, rpl_gtid *gtid,
+ const Relay_log_info *rli);
int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi);
+ int check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli);
};
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 27f17668849..d1e0ca518f1 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -202,7 +202,7 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
- bool group_skip_for_stop= false;
+ bool skip_event_group= false;
rpl_group_info *group_rgi= NULL;
group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
@@ -385,13 +385,13 @@ handle_rpl_parallel_thread(void *arg)
point where we can safely stop. So set a flag that will cause us
to skip, rather than execute, the following events.
*/
- group_skip_for_stop= true;
+ skip_event_group= true;
}
else
- group_skip_for_stop= false;
+ skip_event_group= false;
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
- group_skip_for_stop= true;
+ skip_event_group= true;
else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
{
/*
@@ -420,6 +420,16 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr->wakeup_subsequent_commits(err);
}
thd->wait_for_commit_ptr= &rgi->commit_orderer;
+
+ if (opt_gtid_ignore_duplicates)
+ {
+ int res=
+ rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid,
+ rgi->rli);
+ /* ToDo: Handle res==-1 error. */
+ if (!res)
+ skip_event_group= true;
+ }
}
group_ending= event_type == XID_EVENT ||
@@ -438,7 +448,7 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->is_error && !group_skip_for_stop)
+ if (!rgi->is_error && !skip_event_group)
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
@@ -464,7 +474,7 @@ handle_rpl_parallel_thread(void *arg)
rgi->next= rgis_to_free;
rgis_to_free= rgi;
group_rgi= rgi= NULL;
- group_skip_for_stop= false;
+ skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@@ -526,7 +536,7 @@ handle_rpl_parallel_thread(void *arg)
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);
group_rgi= NULL;
- group_skip_for_stop= false;
+ skip_event_group= false;
}
if (!in_event_group)
{
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 0fae3a3bb89..020f984ad50 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1435,7 +1435,8 @@ rpl_load_gtid_slave_state(THD *thd)
if ((err= rpl_global_gtid_slave_state.update(tmp_entry.gtid.domain_id,
tmp_entry.gtid.server_id,
tmp_entry.sub_id,
- tmp_entry.gtid.seq_no)))
+ tmp_entry.gtid.seq_no,
+ NULL)))
{
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
diff --git a/sql/slave.cc b/sql/slave.cc
index 74955c09ced..cf741ccedc0 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -2047,6 +2047,39 @@ after_set_capability:
}
}
+ query_str.length(0);
+ if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_ignore_duplicates="),
+ system_charset_info) ||
+ query_str.append_ulonglong(opt_gtid_ignore_duplicates != false))
+ {
+ err_code= ER_OUTOFMEMORY;
+ errmsg= "The slave I/O thread stops because a fatal out-of-memory error "
+ "is encountered when it tries to set @slave_gtid_ignore_duplicates.";
+ sprintf(err_buff, "%s Error: Out of memory", errmsg);
+ goto err;
+ }
+
+ rc= mysql_real_query(mysql, query_str.ptr(), query_str.length());
+ if (rc)
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting @slave_gtid_ignore_duplicates failed with "
+ "error: %s", mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to set @slave_gtid_ignore_duplicates.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+
if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID)
{
query_str.length(0);
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 516e5d7567a..c7bd28259ae 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -115,6 +115,39 @@ fake_event_write(NET *net, String *packet, const char **errmsg)
/*
+ Helper structure, used to pass miscellaneous info from mysql_binlog_send()
+ into the helper functions that it calls.
+*/
+struct binlog_send_info {
+ rpl_binlog_state until_binlog_state;
+ slave_connection_state gtid_state;
+ THD *thd;
+ NET *net;
+ String *packet;
+ char *log_file_name;
+ slave_connection_state *until_gtid_state;
+ Format_description_log_event *fdev;
+ int mariadb_slave_capability;
+ enum_gtid_skip_type gtid_skip_group;
+ enum_gtid_until_state gtid_until_group;
+ ushort flags;
+ uint8 current_checksum_alg;
+ bool slave_gtid_strict_mode;
+ bool send_fake_gtid_list;
+ bool slave_gtid_ignore_duplicates;
+ bool using_gtid_state;
+
+ binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn)
+ : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
+ log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
+ gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
+ flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
+ slave_gtid_strict_mode(false), send_fake_gtid_list(false),
+ slave_gtid_ignore_duplicates(false)
+ { }
+};
+
+/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
send to the slave (because the slave may not know it if it just asked for
@@ -132,16 +165,16 @@ fake_event_write(NET *net, String *packet, const char **errmsg)
part.
*/
-static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
- ulonglong position, const char** errmsg,
- uint8 checksum_alg_arg)
+static int fake_rotate_event(binlog_send_info *info, ulonglong position,
+ const char** errmsg, uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
char buf[ROTATE_HEADER_LEN+100];
my_bool do_checksum;
int err;
- char* p = log_file_name+dirname_length(log_file_name);
+ char* p = info->log_file_name+dirname_length(info->log_file_name);
uint ident_len = (uint) strlen(p);
+ String *packet= info->packet;
ha_checksum crc;
if ((err= fake_event_header(packet, ROTATE_EVENT,
@@ -160,22 +193,23 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
}
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
- (err= fake_event_write(net, packet, errmsg)))
+ (err= fake_event_write(info->net, packet, errmsg)))
DBUG_RETURN(err);
DBUG_RETURN(0);
}
-static int fake_gtid_list_event(NET* net, String* packet,
+static int fake_gtid_list_event(binlog_send_info *info,
Gtid_list_log_event *glev, const char** errmsg,
- uint8 checksum_alg_arg, uint32 current_pos)
+ uint32 current_pos)
{
my_bool do_checksum;
int err;
ha_checksum crc;
char buf[128];
String str(buf, sizeof(buf), system_charset_info);
+ String* packet= info->packet;
str.length(0);
if (glev->to_packet(&str))
@@ -185,7 +219,7 @@ static int fake_gtid_list_event(NET* net, String* packet,
}
if ((err= fake_event_header(packet, GTID_LIST_EVENT,
str.length(), &do_checksum, &crc,
- errmsg, checksum_alg_arg, current_pos)))
+ errmsg, info->current_checksum_alg, current_pos)))
return err;
packet->append(str);
@@ -195,7 +229,7 @@ static int fake_gtid_list_event(NET* net, String* packet,
}
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
- (err= fake_event_write(net, packet, errmsg)))
+ (err= fake_event_write(info->net, packet, errmsg)))
return err;
return 0;
@@ -627,6 +661,19 @@ get_slave_gtid_strict_mode(THD *thd)
}
+static bool
+get_slave_gtid_ignore_duplicates(THD *thd)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_ignore_duplicates") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_int(&null_value) && !null_value;
+}
+
+
/*
Get the value of the @slave_until_gtid user variable into the supplied
String (this is the GTID position specified for START SLAVE UNTIL
@@ -914,16 +961,16 @@ give_error_start_pos_missing_in_binlog(int *err, const char **errormsg,
*/
static int
-check_slave_start_position(THD *thd, slave_connection_state *st,
- const char **errormsg, rpl_gtid *error_gtid,
- slave_connection_state *until_gtid_state)
+check_slave_start_position(binlog_send_info *info, const char **errormsg,
+ rpl_gtid *error_gtid)
{
uint32 i;
int err;
slave_connection_state::entry **delete_list= NULL;
uint32 delete_idx= 0;
+ slave_connection_state *st= &info->gtid_state;
- if (rpl_load_gtid_slave_state(thd))
+ if (rpl_load_gtid_slave_state(info->thd))
{
*errormsg= "Failed to load replication slave GTID state";
err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
@@ -963,6 +1010,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
if (!start_at_own_slave_pos)
{
rpl_gtid domain_gtid;
+ slave_connection_state *until_gtid_state= info->until_gtid_state;
rpl_gtid *until_gtid;
if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
@@ -981,6 +1029,17 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
continue;
}
+ if (info->slave_gtid_ignore_duplicates &&
+ domain_gtid.seq_no < slave_gtid->seq_no)
+ {
+ /*
+ When --gtid-ignore-duplicates, it is ok for the slave to request
+ something that we do not have (yet) - they might already have gotten
+ it through another path in a multi-path replication hierarchy.
+ */
+ continue;
+ }
+
if (until_gtid_state &&
( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
(mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
@@ -1462,13 +1521,11 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
static bool
-is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
- enum_gtid_until_state gtid_until_group,
- Log_event_type event_type, uint8 current_checksum_alg,
- ushort flags, const char **errmsg,
- rpl_binlog_state *until_binlog_state, uint32 current_pos)
+is_until_reached(binlog_send_info *info, ulong *ev_offset,
+ Log_event_type event_type, const char **errmsg,
+ uint32 current_pos)
{
- switch (gtid_until_group)
+ switch (info->gtid_until_group)
{
case GTID_UNTIL_NOT_DONE:
return false;
@@ -1479,9 +1536,10 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
case GTID_UNTIL_STOP_AFTER_TRANSACTION:
if (event_type != XID_EVENT &&
(event_type != QUERY_EVENT ||
- !Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset,
- packet->length()-*ev_offset,
- current_checksum_alg)))
+ !Query_log_event::peek_is_commit_rollback
+ (info->packet->ptr()+*ev_offset,
+ info->packet->length()-*ev_offset,
+ info->current_checksum_alg)))
return false;
break;
}
@@ -1493,12 +1551,11 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
Send a last fake Gtid_list_log_event with a flag set to mark that we
stop due to UNTIL condition.
*/
- if (reset_transmit_packet(thd, flags, ev_offset, errmsg))
+ if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg))
return true;
- Gtid_list_log_event glev(until_binlog_state,
+ Gtid_list_log_event glev(&info->until_binlog_state,
Gtid_list_log_event::FLAG_UNTIL_REACHED);
- if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg,
- current_pos))
+ if (fake_gtid_list_event(info, &glev, errmsg, current_pos))
return true;
*errmsg= NULL;
return true;
@@ -1512,23 +1569,19 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
Returns NULL on success, error message string on error.
*/
static const char *
-send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
- Log_event_type event_type, char *log_file_name,
- IO_CACHE *log, int mariadb_slave_capability,
- ulong ev_offset, uint8 current_checksum_alg,
- bool using_gtid_state, slave_connection_state *gtid_state,
- enum_gtid_skip_type *gtid_skip_group,
- slave_connection_state *until_gtid_state,
- enum_gtid_until_state *gtid_until_group,
- rpl_binlog_state *until_binlog_state,
- bool slave_gtid_strict_mode, rpl_gtid *error_gtid,
- bool *send_fake_gtid_list,
- Format_description_log_event *fdev)
+send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
+ IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid)
{
my_off_t pos;
+ String* const packet= info->packet;
size_t len= packet->length();
+ int mariadb_slave_capability= info->mariadb_slave_capability;
+ uint8 current_checksum_alg= info->current_checksum_alg;
+ slave_connection_state *gtid_state= &info->gtid_state;
+ slave_connection_state *until_gtid_state= info->until_gtid_state;
- if (event_type == GTID_LIST_EVENT && using_gtid_state && until_gtid_state)
+ if (event_type == GTID_LIST_EVENT &&
+ info->using_gtid_state && until_gtid_state)
{
rpl_gtid *gtid_list;
uint32 list_len;
@@ -1537,12 +1590,12 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
if (ev_offset > len ||
Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
- &gtid_list, &list_len, fdev))
+ &gtid_list, &list_len, info->fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_list_log_event: corrupt binlog";
}
- err= until_binlog_state->load(gtid_list, list_len);
+ err= info->until_binlog_state.load(gtid_list, list_len);
my_free(gtid_list);
if (err)
{
@@ -1552,7 +1605,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
}
/* Skip GTID event groups until we reach slave position within a domain_id. */
- if (event_type == GTID_EVENT && using_gtid_state)
+ if (event_type == GTID_EVENT && info->using_gtid_state)
{
uchar flags2;
slave_connection_state::entry *gtid_entry;
@@ -1566,7 +1619,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
&event_gtid.domain_id, &event_gtid.server_id,
- &event_gtid.seq_no, &flags2, fdev))
+ &event_gtid.seq_no, &flags2, info->fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_log_event: corrupt binlog";
@@ -1575,7 +1628,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
{
rpl_gtid *dbug_gtid;
- if ((dbug_gtid= until_binlog_state->find_nolock(10,1)) &&
+ if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) &&
dbug_gtid->seq_no == 100)
{
DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
@@ -1585,7 +1638,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
}
});
- if (until_binlog_state->update_nolock(&event_gtid, false))
+ if (info->until_binlog_state.update_nolock(&event_gtid, false))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory";
@@ -1618,12 +1671,13 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
/* Skip this event group if we have not yet reached slave start pos. */
if (event_gtid.server_id != gtid->server_id ||
event_gtid.seq_no <= gtid->seq_no)
- *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
if (event_gtid.server_id == gtid->server_id &&
event_gtid.seq_no >= gtid->seq_no)
{
- if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no &&
+ if (info->slave_gtid_strict_mode &&
+ event_gtid.seq_no > gtid->seq_no &&
!(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS))
{
/*
@@ -1645,7 +1699,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
The fake event will be sent at the end of this event group.
*/
- *send_fake_gtid_list= true;
+ info->send_fake_gtid_list= true;
/*
Delete this entry if we have reached slave start position (so we
@@ -1666,7 +1720,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
This domain already reached the START SLAVE UNTIL stop condition,
so skip this event group.
*/
- *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
}
else if (event_gtid.server_id == gtid->server_id &&
@@ -1681,9 +1735,9 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
uint64 until_seq_no= gtid->seq_no;
until_gtid_state->remove(gtid);
if (until_gtid_state->count() == 0)
- *gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
- GTID_UNTIL_STOP_AFTER_STANDALONE :
- GTID_UNTIL_STOP_AFTER_TRANSACTION);
+ info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_UNTIL_STOP_AFTER_STANDALONE :
+ GTID_UNTIL_STOP_AFTER_TRANSACTION);
if (event_gtid.seq_no > until_seq_no)
{
/*
@@ -1693,7 +1747,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
should be in, we can just stop now. And we also need to skip this
event group (as it is beyond the UNTIL condition).
*/
- *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
}
}
@@ -1707,11 +1761,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Note that slave that understands GTID can also tolerate holes, so there is
no need to supply dummy event.
*/
- switch (*gtid_skip_group)
+ switch (info->gtid_skip_group)
{
case GTID_SKIP_STANDALONE:
if (!Log_event::is_part_of_group(event_type))
- *gtid_skip_group= GTID_SKIP_NOT;
+ info->gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_TRANSACTION:
if (event_type == XID_EVENT ||
@@ -1719,14 +1773,15 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
len - ev_offset,
current_checksum_alg)))
- *gtid_skip_group= GTID_SKIP_NOT;
+ info->gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_NOT:
break;
}
/* Do not send annotate_rows events unless slave requested it. */
- if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+ if (event_type == ANNOTATE_ROWS_EVENT &&
+ !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
{
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
{
@@ -1820,7 +1875,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Skip events with the @@skip_replication flag set, if slave requested
skipping of such events.
*/
- if (thd->variables.option_bits & OPTION_SKIP_REPLICATION)
+ if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION)
{
/*
The first byte of the packet is a '\0' to distinguish it from an error
@@ -1831,17 +1886,17 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
return NULL;
}
- THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
+ THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
- (thd, flags, packet, log_file_name, pos)))
+ (info->thd, info->flags, packet, info->log_file_name, pos)))
{
my_errno= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
}
- if (my_net_write(net, (uchar*) packet->ptr(), len))
+ if (my_net_write(info->net, (uchar*) packet->ptr(), len))
{
my_errno= ER_UNKNOWN_ERROR;
return "Failed on my_net_write()";
@@ -1850,14 +1905,15 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
if (event_type == LOAD_EVENT)
{
- if (send_file(thd))
+ if (send_file(info->thd))
{
my_errno= ER_UNKNOWN_ERROR;
return "failed in send_file()";
}
}
- if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ if (RUN_HOOK(binlog_transmit, after_send_event,
+ (info->thd, info->flags, packet)))
{
my_errno= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
@@ -1878,31 +1934,21 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
IO_CACHE log;
File file = -1;
- String* const packet = &thd->packet;
+ String* const packet= &thd->packet;
int error;
const char *errmsg = "Unknown error", *tmp_msg;
char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
- NET* net = &thd->net;
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
- int mariadb_slave_capability;
char str_buf[128];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
- bool using_gtid_state;
char str_buf2[128];
String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
- slave_connection_state gtid_state, until_gtid_state_obj;
- slave_connection_state *until_gtid_state= NULL;
+ slave_connection_state until_gtid_state_obj;
rpl_gtid error_gtid;
- enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
- enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE;
- rpl_binlog_state until_binlog_state;
- bool slave_gtid_strict_mode= false;
- bool send_fake_gtid_list= false;
+ binlog_send_info info(thd, packet, flags, log_file_name);
- uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
- Format_description_log_event *fdev= NULL;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
@@ -1928,16 +1974,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
heartbeat_ts= &heartbeat_buf;
set_timespec_nsec(*heartbeat_ts, 0);
}
- mariadb_slave_capability= get_mariadb_slave_capability(thd);
+ info.mariadb_slave_capability= get_mariadb_slave_capability(thd);
connect_gtid_state.length(0);
- using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
- DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
- if (using_gtid_state)
+ info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;);
+ if (info.using_gtid_state)
{
- slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+ info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+ info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
if(get_slave_until_gtid(thd, &slave_until_gtid_str))
- until_gtid_state= &until_gtid_state_obj;
+ info.until_gtid_state= &until_gtid_state_obj;
}
DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
@@ -1978,7 +2025,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
#endif
- if (!(fdev= new Format_description_log_event(3)))
+ if (!(info.fdev= new Format_description_log_event(3)))
{
errmsg= "Out of memory initializing format_description event";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
@@ -1999,33 +2046,32 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
name=search_file_name;
- if (using_gtid_state)
+ if (info.using_gtid_state)
{
- if (gtid_state.load(connect_gtid_state.c_ptr_quick(),
- connect_gtid_state.length()))
+ if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(),
+ connect_gtid_state.length()))
{
errmsg= "Out of memory or malformed slave request when obtaining start "
"position from GTID state";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- if (until_gtid_state &&
- until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
- slave_until_gtid_str.length()))
+ if (info.until_gtid_state &&
+ info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
+ slave_until_gtid_str.length()))
{
errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
"position sent from slave";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
- &error_gtid, until_gtid_state)))
+ if ((error= check_slave_start_position(&info, &errmsg, &error_gtid)))
{
my_errno= error;
goto err;
}
- if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name,
- until_gtid_state)))
+ if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name,
+ info.until_gtid_state)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -2098,7 +2144,7 @@ impossible position";
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
- if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
+ if (fake_rotate_event(&info, pos, &errmsg,
get_binlog_checksum_value_at_connect(thd)))
{
/*
@@ -2150,14 +2196,14 @@ impossible position";
{
Format_description_log_event *tmp;
- current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
- packet->length() - ev_offset);
- DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+ packet->length() - ev_offset);
+ DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
@@ -2170,14 +2216,14 @@ impossible position";
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset,
- fdev)))
+ info.fdev)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory";
goto err;
}
- delete fdev;
- fdev= tmp;
+ delete info.fdev;
+ info.fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
@@ -2194,12 +2240,12 @@ impossible position";
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* fix the checksum due to latest changes in header */
- if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset);
/* send it */
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -2235,13 +2281,13 @@ impossible position";
We will send one event, the format_description, and then stop.
*/
- if (until_gtid_state && until_gtid_state->count() == 0)
- gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+ if (info.until_gtid_state && info.until_gtid_state->count() == 0)
+ info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
/* seek to the requested position, to start the requested dump */
my_b_seek(&log, pos); // Seek will done on next read
- while (!net->error && net->vio != 0 && !thd->killed)
+ while (!info.net->error && info.net->vio != 0 && !thd->killed)
{
Log_event_type event_type= UNKNOWN_EVENT;
killed_state killed;
@@ -2254,14 +2300,14 @@ impossible position";
bool is_active_binlog= false;
while (!(killed= thd->killed) &&
!(error = Log_event::read_log_event(&log, packet, log_lock,
- current_checksum_alg,
+ info.current_checksum_alg,
log_file_name,
&is_active_binlog)))
{
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
{
- net_flush(net);
+ net_flush(info.net);
errmsg = "Debugging binlog dump abort";
my_errno= ER_UNKNOWN_ERROR;
goto err;
@@ -2279,7 +2325,7 @@ impossible position";
{
if (event_type == XID_EVENT)
{
- net_flush(net);
+ net_flush(info.net);
const char act[]=
"now "
"wait_for signal.continue";
@@ -2298,14 +2344,14 @@ impossible position";
{
Format_description_log_event *tmp;
- current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+ info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
- DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
@@ -2318,14 +2364,14 @@ impossible position";
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset,
- fdev)))
+ info.fdev)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory";
goto err;
}
- delete fdev;
- fdev= tmp;
+ delete info.fdev;
+ info.fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
@@ -2343,36 +2389,28 @@ impossible position";
}
#endif
- if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log,
- mariadb_slave_capability, ev_offset,
- current_checksum_alg, using_gtid_state,
- &gtid_state, &gtid_skip_group,
- until_gtid_state, &gtid_until_group,
- &until_binlog_state,
- slave_gtid_strict_mode, &error_gtid,
- &send_fake_gtid_list, fdev)))
+ if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
+ ev_offset, &error_gtid)))
{
errmsg= tmp_msg;
goto err;
}
- if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT)
+ if (unlikely(info.send_fake_gtid_list) &&
+ info.gtid_skip_group == GTID_SKIP_NOT)
{
- Gtid_list_log_event glev(&until_binlog_state, 0);
+ Gtid_list_log_event glev(&info.until_binlog_state, 0);
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
- fake_gtid_list_event(net, packet, &glev, &errmsg,
- current_checksum_alg, my_b_tell(&log)))
+ fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
{
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- send_fake_gtid_list= false;
+ info.send_fake_gtid_list= false;
}
- if (until_gtid_state &&
- is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
- event_type, current_checksum_alg, flags, &errmsg,
- &until_binlog_state, my_b_tell(&log)))
+ if (info.until_gtid_state &&
+ is_until_reached(&info, &ev_offset, event_type, &errmsg,
+ my_b_tell(&log)))
{
if (errmsg)
{
@@ -2386,7 +2424,7 @@ impossible position";
{
if (event_type == XID_EVENT)
{
- net_flush(net);
+ net_flush(info.net);
}
});
@@ -2423,7 +2461,7 @@ impossible position";
/*
Block until there is more data in the log
*/
- if (net_flush(net))
+ if (net_flush(info.net))
{
errmsg = "failed on net_flush()";
my_errno= ER_UNKNOWN_ERROR;
@@ -2466,7 +2504,7 @@ impossible position";
mysql_mutex_lock(log_lock);
switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
- current_checksum_alg)) {
+ info.current_checksum_alg)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
mysql_mutex_unlock(log_lock);
@@ -2524,7 +2562,8 @@ impossible position";
thd->EXIT_COND(&old_stage);
goto err;
}
- if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
+ if (send_heartbeat_event(info.net, packet, p_coord,
+ info.current_checksum_alg))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -2549,36 +2588,28 @@ impossible position";
if (read_packet)
{
- if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log,
- mariadb_slave_capability, ev_offset,
- current_checksum_alg,
- using_gtid_state, &gtid_state,
- &gtid_skip_group, until_gtid_state,
- &gtid_until_group, &until_binlog_state,
- slave_gtid_strict_mode, &error_gtid,
- &send_fake_gtid_list, fdev)))
+ if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
+ ev_offset, &error_gtid)))
{
errmsg= tmp_msg;
goto err;
}
- if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT)
+ if (unlikely(info.send_fake_gtid_list)
+ && info.gtid_skip_group == GTID_SKIP_NOT)
{
- Gtid_list_log_event glev(&until_binlog_state, 0);
+ Gtid_list_log_event glev(&info.until_binlog_state, 0);
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
- fake_gtid_list_event(net, packet, &glev, &errmsg,
- current_checksum_alg, my_b_tell(&log)))
+ fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
{
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- send_fake_gtid_list= false;
+ info.send_fake_gtid_list= false;
}
- if (until_gtid_state &&
- is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
- event_type, current_checksum_alg, flags, &errmsg,
- &until_binlog_state, my_b_tell(&log)))
+ if (info.until_gtid_state &&
+ is_until_reached(&info, &ev_offset, event_type, &errmsg,
+ my_b_tell(&log)))
{
if (errmsg)
{
@@ -2633,8 +2664,8 @@ impossible position";
read and send is Format_description_log_event.
*/
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
- fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
- &errmsg, current_checksum_alg))
+ fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg,
+ info.current_checksum_alg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -2655,7 +2686,7 @@ end:
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
- delete fdev;
+ delete info.fdev;
DBUG_VOID_RETURN;
err:
@@ -2731,7 +2762,7 @@ err:
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
- delete fdev;
+ delete info.fdev;
my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index fbf0c624a88..8aa202b381b 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1819,6 +1819,54 @@ static Sys_var_ulong Sys_slave_parallel_max_queued(
"--slave-parallel-threads > 0.",
GLOBAL_VAR(opt_slave_parallel_max_queued), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1));
+
+
+static bool
+check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var)
+{
+ bool running;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ if (running)
+ return true;
+
+ return false;
+}
+
+static bool
+fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type)
+{
+ bool running;
+ bool err= false;
+
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ if (running)
+ err= true;
+ mysql_mutex_lock(&LOCK_global_system_variables);
+
+ /* ToDo: Isn't there a race here? I need to change the variable only under the LOCK_active_mi, and only if running is false. */
+ return err;
+}
+
+
+static Sys_var_mybool Sys_gtid_ignore_duplicates(
+ "gtid_ignore_duplicates",
+ "When set, different master connections in multi-source replication are "
+ "allowed to receive and process event groups with the same GTID (when "
+ "using GTID mode). Only one will be applied, any others will be "
+ "ignored. Within a given replication domain, just the sequence number "
+ "will be used to decide whether a given GTID has been already applied; "
+ "this means it is the responsibility of the user to ensure that GTID "
+ "sequence numbers are strictly increasing.",
+ GLOBAL_VAR(opt_gtid_ignore_duplicates), CMD_LINE(OPT_ARG),
+ DEFAULT(FALSE), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG, ON_CHECK(check_gtid_ignore_duplicates),
+ ON_UPDATE(fix_gtid_ignore_duplicates));
#endif