summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-05-15 19:52:21 +0200
committerunknown <knielsen@knielsen-hq.org>2013-05-15 19:52:21 +0200
commit9fae9930244d505585d83590051a17df9bab7d8a (patch)
tree8d03f56b634a5f11d47c83200128dc81071833e5 /sql
parent7202c21b343c14d0f1fc868fc7789486338a656f (diff)
downloadmariadb-git-9fae9930244d505585d83590051a17df9bab7d8a.tar.gz
MDEV-26: Global transaction ID.
Implement START SLAVE UNTIL master_gtid_pos = "<GTID position>". Add test cases, including a test showing how to use this to promote a new master among a set of slaves.
Diffstat (limited to 'sql')
-rw-r--r--sql/lex.h3
-rw-r--r--sql/log.cc11
-rw-r--r--sql/log_event.cc88
-rw-r--r--sql/log_event.h19
-rw-r--r--sql/rpl_gtid.cc25
-rw-r--r--sql/rpl_gtid.h2
-rw-r--r--sql/rpl_rli.cc3
-rw-r--r--sql/rpl_rli.h8
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/slave.cc82
-rw-r--r--sql/sql_lex.h4
-rw-r--r--sql/sql_repl.cc585
-rw-r--r--sql/sql_yacc.yy10
13 files changed, 695 insertions, 147 deletions
diff --git a/sql/lex.h b/sql/lex.h
index edf833021b0..756e7e80f7e 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -330,7 +330,7 @@ static SYMBOL symbols[] = {
{ "LOW_PRIORITY", SYM(LOW_PRIORITY)},
{ "MASTER", SYM(MASTER_SYM)},
{ "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)},
- { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)},
+ { "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)},
{ "MASTER_HOST", SYM(MASTER_HOST_SYM)},
{ "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)},
{ "MASTER_LOG_POS", SYM(MASTER_LOG_POS_SYM)},
@@ -345,6 +345,7 @@ static SYMBOL symbols[] = {
{ "MASTER_SSL_KEY", SYM(MASTER_SSL_KEY_SYM)},
{ "MASTER_SSL_VERIFY_SERVER_CERT", SYM(MASTER_SSL_VERIFY_SERVER_CERT_SYM)},
{ "MASTER_USER", SYM(MASTER_USER_SYM)},
+ { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)},
{ "MASTER_HEARTBEAT_PERIOD", SYM(MASTER_HEARTBEAT_PERIOD_SYM)},
{ "MATCH", SYM(MATCH)},
{ "MAX_CONNECTIONS_PER_HOUR", SYM(MAX_CONNECTIONS_PER_HOUR)},
diff --git a/sql/log.cc b/sql/log.cc
index c0cd4be8a45..3266934beda 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -3280,7 +3280,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
there had been an entry (domain_id, server_id, 0).
*/
- Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
+ Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0);
if (gl_ev.write(&log_file))
goto err;
@@ -8718,16 +8718,11 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
case GTID_LIST_EVENT:
if (first_round)
{
- uint32 i;
Gtid_list_log_event *glev= (Gtid_list_log_event *)ev;
/* Initialise the binlog state from the Gtid_list event. */
- rpl_global_gtid_binlog_state.reset();
- for (i= 0; i < glev->count; ++i)
- {
- if (rpl_global_gtid_binlog_state.update(&(glev->list[i])))
- goto err2;
- }
+ if (rpl_global_gtid_binlog_state.load(glev->list, glev->count))
+ goto err2;
}
break;
diff --git a/sql/log_event.cc b/sql/log_event.cc
index aa86fa6ff62..9fbcbb68145 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6320,6 +6320,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
: Log_event(buf, description_event), count(0), list(0)
{
uint32 i;
+ uint32 val;
uint8 header_size= description_event->common_header_len;
uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1];
if (event_len < header_size + post_header_len ||
@@ -6327,7 +6328,9 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
return;
buf+= header_size;
- count= uint4korr(buf) & ((1<<28)-1);
+ val= uint4korr(buf);
+ count= val & ((1<<28)-1);
+ gl_flags= val & ((uint32)0xf << 28);
buf+= 4;
if (event_len - (header_size + post_header_len) < count*element_size ||
(!(list= (rpl_gtid *)my_malloc(count*sizeof(*list) + (count == 0),
@@ -6348,8 +6351,9 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
#ifdef MYSQL_SERVER
-Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
- : count(gtid_set->count()), list(0)
+Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set,
+ uint32 gl_flags_)
+ : count(gtid_set->count()), gl_flags(gl_flags_), list(0)
{
cache_type= EVENT_NO_CACHE;
/* Failure to allocate memory will be caught by is_valid() returning false. */
@@ -6359,32 +6363,70 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
gtid_set->get_gtid_list(list, count);
}
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
bool
-Gtid_list_log_event::write(IO_CACHE *file)
+Gtid_list_log_event::to_packet(String *packet)
{
uint32 i;
- uchar buf[element_size];
+ uchar *p;
+ uint32 needed_length;
DBUG_ASSERT(count < 1<<28);
- if (write_header(file, get_data_size()))
- return 1;
- int4store(buf, count & ((1<<28)-1));
- if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN))
- return 1;
+ needed_length= packet->length() + 4 + count*element_size;
+ if (packet->reserve(needed_length))
+ return true;
+ p= (uchar *)packet->ptr() + packet->length();;
+ packet->length(needed_length);
+ int4store(p, (count & ((1<<28)-1)) | gl_flags);
+ p += 4;
for (i= 0; i < count; ++i)
{
- int4store(buf, list[i].domain_id);
- int4store(buf+4, list[i].server_id);
- int8store(buf+8, list[i].seq_no);
- if (wrapper_my_b_safe_write(file, buf, element_size))
- return 1;
+ int4store(p, list[i].domain_id);
+ int4store(p+4, list[i].server_id);
+ int8store(p+8, list[i].seq_no);
+ p += 16;
}
- return write_footer(file);
+
+ return false;
+}
+
+
+bool
+Gtid_list_log_event::write(IO_CACHE *file)
+{
+ char buf[128];
+ String packet(buf, sizeof(buf), system_charset_info);
+
+ packet.length(0);
+ if (to_packet(&packet))
+ return true;
+ return
+ write_header(file, get_data_size()) ||
+ wrapper_my_b_safe_write(file, (uchar *)packet.ptr(), packet.length()) ||
+ write_footer(file);
+}
+
+
+int
+Gtid_list_log_event::do_apply_event(Relay_log_info const *rli)
+{
+ int ret= Log_event::do_apply_event(rli);
+ if (rli->until_condition == Relay_log_info::UNTIL_GTID &&
+ (gl_flags & FLAG_UNTIL_REACHED))
+ {
+ char str_buf[128];
+ String str(str_buf, sizeof(str_buf), system_charset_info);
+ const_cast<Relay_log_info*>(rli)->until_gtid_pos.to_string(&str);
+ sql_print_information("Slave SQL thread stops because it reached its"
+ " UNTIL master_gtid_pos %s", str.c_ptr_safe());
+ const_cast<Relay_log_info*>(rli)->abort_slave= true;
+ }
+ return ret;
}
-#ifdef HAVE_REPLICATION
void
Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol)
{
@@ -6439,12 +6481,24 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
*/
bool
Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
+ uint8 checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len)
{
const char *p;
uint32 count_field, count;
rpl_gtid *gtid_list;
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ if (event_len > BINLOG_CHECKSUM_LEN)
+ event_len-= BINLOG_CHECKSUM_LEN;
+ else
+ event_len= 0;
+ }
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
return true;
p= event_start + LOG_EVENT_HEADER_LEN;
diff --git a/sql/log_event.h b/sql/log_event.h
index 5026b280b27..21d60cac928 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3116,7 +3116,7 @@ public:
<td>count</td>
<td>4 byte unsigned integer</td>
<td>The lower 28 bits are the number of GTIDs. The upper 4 bits are
- reserved for flags bits for future expansion</td>
+ flags bits.</td>
</tr>
</table>
@@ -3149,18 +3149,28 @@ public:
</table>
The three elements in the body repeat COUNT times to form the GTID list.
+
+ At the time of writing, only one flag bit is in use.
+
+ Bit 28 of `count' is used for flag FLAG_UNTIL_REACHED, which is sent in a
+ Gtid_list event from the master to the slave to indicate that the START
+ SLAVE UNTIL master_gtid_pos=xxx condition has been reached. (This flag is
+ only sent in "fake" events generated on the fly, it is not written into
+ the binlog).
*/
class Gtid_list_log_event: public Log_event
{
public:
uint32 count;
+ uint32 gl_flags;
struct rpl_gtid *list;
static const uint element_size= 4+4+8;
+ static const uint32 FLAG_UNTIL_REACHED= (1<<28);
#ifdef MYSQL_SERVER
- Gtid_list_log_event(rpl_binlog_state *gtid_set);
+ Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags);
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
#endif
@@ -3173,10 +3183,13 @@ public:
Log_event_type get_type_code() { return GTID_LIST_EVENT; }
int get_data_size() { return GTID_LIST_HEADER_LEN + count*element_size; }
bool is_valid() const { return list != NULL; }
-#ifdef MYSQL_SERVER
+#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
+ bool to_packet(String *packet);
bool write(IO_CACHE *file);
+ virtual int do_apply_event(Relay_log_info const *rli);
#endif
static bool peek(const char *event_start, uint32 event_len,
+ uint8 checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len);
};
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index bda060115ed..a4bdeb9932b 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -711,6 +711,22 @@ void rpl_binlog_state::free()
}
}
+
+bool
+rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
+{
+ uint32 i;
+
+ reset();
+ for (i= 0; i < count; ++i)
+ {
+ if (update(&(list[i])))
+ return true;
+ }
+ return false;
+}
+
+
rpl_binlog_state::~rpl_binlog_state()
{
free();
@@ -1117,10 +1133,17 @@ slave_connection_state::remove(const rpl_gtid *in_gtid)
int
slave_connection_state::to_string(String *out_str)
{
+ out_str->length(0);
+ return append_to_string(out_str);
+}
+
+
+int
+slave_connection_state::append_to_string(String *out_str)
+{
uint32 i;
bool first;
- out_str->length(0);
first= true;
for (i= 0; i < hash.records; ++i)
{
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 046533fd760..bd6663c0659 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -142,6 +142,7 @@ struct rpl_binlog_state
void reset();
void free();
+ bool load(struct rpl_gtid *list, uint32 count);
int update(const struct rpl_gtid *gtid);
uint64 seq_no_from_state();
int write_to_iocache(IO_CACHE *dest);
@@ -172,6 +173,7 @@ struct slave_connection_state
void remove(const rpl_gtid *gtid);
ulong count() const { return hash.records; }
int to_string(String *out_str);
+ int append_to_string(String *out_str);
};
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index ec2ca048976..d27a80313ac 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1096,7 +1096,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
ulonglong log_pos;
DBUG_ENTER("Relay_log_info::is_until_satisfied");
- DBUG_ASSERT(until_condition != UNTIL_NONE);
+ DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
+ until_condition == UNTIL_RELAY_POS);
if (until_condition == UNTIL_MASTER_POS)
{
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 7aff6720aac..6dd757343fd 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -263,7 +263,9 @@ public:
thread is running).
*/
- enum {UNTIL_NONE= 0, UNTIL_MASTER_POS, UNTIL_RELAY_POS} until_condition;
+ enum {
+ UNTIL_NONE= 0, UNTIL_MASTER_POS, UNTIL_RELAY_POS, UNTIL_GTID
+ } until_condition;
char until_log_name[FN_REFLEN];
ulonglong until_log_pos;
/* extension extracted from log_name and converted to int */
@@ -277,6 +279,8 @@ public:
UNTIL_LOG_NAMES_CMP_UNKNOWN= -2, UNTIL_LOG_NAMES_CMP_LESS= -1,
UNTIL_LOG_NAMES_CMP_EQUAL= 0, UNTIL_LOG_NAMES_CMP_GREATER= 1
} until_log_names_cmp_result;
+ /* Condition for UNTIL master_gtid_pos. */
+ slave_connection_state until_gtid_pos;
char cached_charset[6];
/*
@@ -354,6 +358,8 @@ public:
bool is_until_satisfied(THD *thd, Log_event *ev);
inline ulonglong until_pos()
{
+ DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
+ until_condition == UNTIL_RELAY_POS);
return ((until_condition == UNTIL_MASTER_POS) ? group_master_log_pos :
group_relay_log_pos);
}
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 5dbc7637552..d7d5710b048 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6543,3 +6543,5 @@ ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG
eng "Requested GTID_POS %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog"
ER_MASTER_GTID_POS_MISSING_DOMAIN
eng "Requested GTID_POS contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog"
+ER_UNTIL_REQUIRES_USING_GTID
+ eng "START SLAVE UNTIL master_gtid_pos requires that slave is using GTID"
diff --git a/sql/slave.cc b/sql/slave.cc
index f0a9b27707c..1b3154e20ab 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1879,6 +1879,43 @@ after_set_capability:
goto err;
}
}
+
+ if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID)
+ {
+ connect_state.length(0);
+ connect_state.append(STRING_WITH_LEN("SET @slave_until_gtid='"),
+ system_charset_info);
+ if (mi->rli.until_gtid_pos.append_to_string(&connect_state))
+ {
+ err_code= ER_OUTOFMEMORY;
+ errmsg= "The slave I/O thread stops because a fatal out-of-memory "
+ "error is encountered when it tries to compute @slave_until_gtid.";
+ sprintf(err_buff, "%s Error: Out of memory", errmsg);
+ goto err;
+ }
+ connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
+
+ rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
+ if (rc)
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting @slave_until_gtid failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to set @slave_until_gtid.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+ }
}
if (!mi->using_gtid)
{
@@ -2363,7 +2400,8 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
- "Relay"), &my_charset_bin);
+ ( mi->rli.until_condition==Relay_log_info::UNTIL_RELAY_POS? "Relay":
+ "Gtid")), &my_charset_bin);
protocol->store(mi->rli.until_log_name, &my_charset_bin);
protocol->store((ulonglong) mi->rli.until_log_pos);
@@ -3057,7 +3095,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
This tests if the position of the beginning of the current event
hits the UNTIL barrier.
*/
- if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
+ if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
rli->is_until_satisfied(thd, ev))
{
char buf[22];
@@ -3954,7 +3993,8 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
saved_master_log_pos= rli->group_master_log_pos;
saved_skip= rli->slave_skip_counter;
}
- if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
+ if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
rli->is_until_satisfied(thd, NULL))
{
char buf[22];
@@ -4793,7 +4833,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
break;
+ case GTID_LIST_EVENT:
+ {
+ const char *errmsg;
+ Gtid_list_log_event *glev;
+ Log_event *tmp;
+
+ if (mi->rli.until_condition != Relay_log_info::UNTIL_GTID)
+ goto default_action;
+ if (!(tmp= Log_event::read_log_event(buf, event_len, &errmsg,
+ mi->rli.relay_log.description_event_for_queue,
+ opt_slave_sql_verify_checksum)))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ glev= static_cast<Gtid_list_log_event *>(tmp);
+ if (glev->gl_flags & Gtid_list_log_event::FLAG_UNTIL_REACHED)
+ {
+ char str_buf[128];
+ String str(str_buf, sizeof(str_buf), system_charset_info);
+ mi->rli.until_gtid_pos.to_string(&str);
+ sql_print_information("Slave IO thread stops because it reached its"
+ " UNTIL master_gtid_pos %s", str.c_ptr_safe());
+ mi->abort_slave= true;
+ }
+ delete glev;
+
+ /*
+ Do not update position for fake Gtid_list event (which has a zero
+ end_log_pos).
+ */
+ inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
+ }
+ break;
+
default:
+ default_action:
inc_pos= event_len;
break;
}
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 8bfcddefdfd..4c1d917aeae 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -289,6 +289,8 @@ struct LEX_MASTER_INFO
char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher;
char *relay_log_name;
LEX_STRING connection_name;
+ /* Value in START SLAVE UNTIL master_gtid_pos=xxx */
+ LEX_STRING gtid_pos_str;
ulonglong pos;
ulong relay_log_pos;
ulong server_id;
@@ -317,6 +319,8 @@ struct LEX_MASTER_INFO
heartbeat_period= 0;
ssl= ssl_verify_server_cert= heartbeat_opt=
repl_ignore_server_ids_opt= use_gtid_opt= LEX_MI_UNCHANGED;
+ gtid_pos_str.length= 0;
+ gtid_pos_str.str= NULL;
}
};
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 32e653cebff..aa21d191166 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -30,6 +30,14 @@
#include "rpl_handler.h"
#include "debug_sync.h"
+
+enum enum_gtid_until_state {
+ GTID_UNTIL_NOT_DONE,
+ GTID_UNTIL_STOP_AFTER_STANDALONE,
+ GTID_UNTIL_STOP_AFTER_TRANSACTION
+};
+
+
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
#ifndef DBUG_OFF
@@ -38,6 +46,74 @@ static int binlog_dump_count = 0;
extern TYPELIB binlog_checksum_typelib;
+
+static int
+fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
+ my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
+ uint8 checksum_alg_arg)
+{
+ char header[LOG_EVENT_HEADER_LEN];
+ ulong event_len;
+
+ *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
+ checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
+
+ /*
+ 'when' (the timestamp) is set to 0 so that slave could distinguish between
+ real and fake Rotate events (if necessary)
+ */
+ memset(header, 0, 4);
+ header[EVENT_TYPE_OFFSET] = (uchar)event_type;
+ event_len= LOG_EVENT_HEADER_LEN + extra_len +
+ (*do_checksum ? BINLOG_CHECKSUM_LEN : 0);
+ int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
+ // TODO: check what problems this may cause and fix them
+ int4store(header + LOG_POS_OFFSET, 0);
+ if (packet->append(header, sizeof(header)))
+ {
+ *errmsg= "Failed due to out-of-memory writing event";
+ return -1;
+ }
+ if (*do_checksum)
+ {
+ *crc= my_checksum(0L, NULL, 0);
+ *crc= my_checksum(*crc, (uchar*)header, sizeof(header));
+ }
+ return 0;
+}
+
+
+static int
+fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg)
+{
+ if (do_checksum)
+ {
+ char b[BINLOG_CHECKSUM_LEN];
+ int4store(b, crc);
+ if (packet->append(b, sizeof(b)))
+ {
+ *errmsg= "Failed due to out-of-memory writing event checksum";
+ return -1;
+ }
+ }
+ return 0;
+}
+
+
+static int
+fake_event_write(NET *net, String *packet, const char **errmsg)
+{
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ {
+ *errmsg = "failed on my_net_write()";
+ return -1;
+ }
+ return 0;
+}
+
+
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
@@ -61,59 +137,71 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
- char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
-
- /*
- this Rotate is to be sent with checksum if and only if
- slave's get_master_version_and_clock time handshake value
- of master's @@global.binlog_checksum was TRUE
- */
-
- my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
- checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
-
- /*
- 'when' (the timestamp) is set to 0 so that slave could distinguish between
- real and fake Rotate events (if necessary)
- */
- memset(header, 0, 4);
- header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
-
+ char buf[ROTATE_HEADER_LEN+100];
+ my_bool do_checksum;
+ int err;
char* p = log_file_name+dirname_length(log_file_name);
uint ident_len = (uint) strlen(p);
- ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
- (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
- int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
- int4store(header + EVENT_LEN_OFFSET, event_len);
- int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
+ ha_checksum crc;
- // TODO: check what problems this may cause and fix them
- int4store(header + LOG_POS_OFFSET, 0);
+ if ((err= fake_event_header(packet, ROTATE_EVENT,
+ ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc,
+ errmsg, checksum_alg_arg)))
+ DBUG_RETURN(err);
- packet->append(header, sizeof(header));
int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN);
packet->append(p, ident_len);
if (do_checksum)
{
- char b[BINLOG_CHECKSUM_LEN];
- ha_checksum crc= my_checksum(0L, NULL, 0);
- crc= my_checksum(crc, (uchar*)header, sizeof(header));
crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
crc= my_checksum(crc, (uchar*)p, ident_len);
- int4store(b, crc);
- packet->append(b, sizeof(b));
}
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
+ (err= fake_event_write(net, packet, errmsg)))
+ DBUG_RETURN(err);
+
+ DBUG_RETURN(0);
+}
+
+
+static int fake_gtid_list_event(NET* net, String* packet,
+ Gtid_list_log_event *glev, const char** errmsg,
+ uint8 checksum_alg_arg)
+{
+ my_bool do_checksum;
+ int err;
+ ha_checksum crc;
+ char buf[128];
+ String str(buf, sizeof(buf), system_charset_info);
+
+ str.length(0);
+ if (glev->to_packet(&str))
{
- *errmsg = "failed on my_net_write()";
- DBUG_RETURN(-1);
+ *errmsg= "Failed due to out-of-memory writing Gtid_list event";
+ return -1;
}
- DBUG_RETURN(0);
+ if ((err= fake_event_header(packet, GTID_LIST_EVENT,
+ str.length(), &do_checksum, &crc,
+ errmsg, checksum_alg_arg)))
+ return err;
+
+ packet->append(str);
+ if (do_checksum)
+ {
+ crc= my_checksum(crc, (uchar*)str.ptr(), str.length());
+ }
+
+ if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
+ (err= fake_event_write(net, packet, errmsg)))
+ return err;
+
+ return 0;
}
+
/*
Reset thread transmit packet buffer for event sending
@@ -527,6 +615,27 @@ get_slave_connect_state(THD *thd, String *out_str)
/*
+ Get the value of the @slave_until_gtid user variable into the supplied
+ String (this is the GTID position specified for START SLAVE UNTIL
+ master_gtid_pos='xxx').
+
+ Returns false if error (ie. slave did not set the variable and is not doing
+ START SLAVE UNTIL mater_gtid_pos='xxx'), true if success.
+*/
+static bool
+get_slave_until_gtid(THD *thd, String *out_str)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_until_gtid") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
+}
+
+
+/*
Function prepares and sends repliation heartbeat event.
@param net net object of THD
@@ -773,10 +882,10 @@ contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
static int
check_slave_start_position(THD *thd, slave_connection_state *st,
- const char **errormsg, rpl_gtid *error_gtid)
+ const char **errormsg, rpl_gtid *error_gtid,
+ slave_connection_state *until_gtid_state)
{
uint32 i;
- bool found;
int err;
rpl_gtid **delete_list= NULL;
uint32 delete_idx= 0;
@@ -791,9 +900,9 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
rpl_gtid master_replication_gtid;
rpl_gtid start_gtid;
- if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
- slave_gtid->server_id,
- &master_gtid)) &&
+ if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
+ slave_gtid->server_id,
+ &master_gtid) &&
master_gtid.seq_no >= slave_gtid->seq_no)
continue;
@@ -814,6 +923,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
slave_gtid->seq_no != master_replication_gtid.seq_no)
{
rpl_gtid domain_gtid;
+ rpl_gtid *until_gtid;
if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
&domain_gtid))
@@ -832,6 +942,27 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
++missing_domains;
continue;
}
+
+ if (until_gtid_state &&
+ ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
+ (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
+ until_gtid->server_id,
+ &master_gtid) &&
+ master_gtid.seq_no >= until_gtid->seq_no)))
+ {
+ /*
+ The slave requested to start from a position that is not (yet) in
+ our binlog, but it also specified an UNTIL condition that _is_ in
+ our binlog (or a missing UNTIL, which means stop at the very
+ beginning). So the stop position is before the start position, and
+ we just delete the entry from the UNTIL hash to mark that this
+ domain has already reached the UNTIL condition.
+ */
+ if(until_gtid)
+ until_gtid_state->remove(until_gtid);
+ continue;
+ }
+
*errormsg= "Requested slave GTID state not found in binlog";
*error_gtid= *slave_gtid;
err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
@@ -951,7 +1082,8 @@ end:
the requested GTID that was already purged.
*/
static const char *
-gtid_find_binlog_file(slave_connection_state *state, char *out_name)
+gtid_find_binlog_file(slave_connection_state *state, char *out_name,
+ slave_connection_state *until_gtid_state)
{
MEM_ROOT memroot;
binlog_file_entry *list;
@@ -1003,42 +1135,60 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name)
if (!glev || contains_all_slave_gtid(state, glev))
{
- uint32 i;
-
strmake(out_name, buf, FN_REFLEN);
- /*
- As a special case, we allow to start from binlog file N if the
- requested GTID is the last event (in the corresponding domain) in
- binlog file (N-1), but then we need to remove that GTID from the slave
- state, rather than skipping events waiting for it to turn up.
- */
- for (i= 0; i < glev->count; ++i)
+ if (glev)
{
- const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
- if (!gtid)
- {
- /*
- contains_all_slave_gtid() returns false if there is any domain in
- Gtid_list_event which is not in the requested slave position.
+ uint32 i;
- We may delete a domain from the slave state inside this loop, but
- we only do this when it is the very last GTID logged for that
- domain in earlier binlogs, and then we can not encounter it in any
- further GTIDs in the Gtid_list.
- */
- DBUG_ASSERT(0);
- continue;
- }
- if (gtid->server_id == glev->list[i].server_id &&
- gtid->seq_no == glev->list[i].seq_no)
+ /*
+ As a special case, we allow to start from binlog file N if the
+ requested GTID is the last event (in the corresponding domain) in
+ binlog file (N-1), but then we need to remove that GTID from the slave
+ state, rather than skipping events waiting for it to turn up.
+
+ If slave is doing START SLAVE UNTIL, check for any UNTIL conditions
+ that are already included in a previous binlog file. Delete any such
+ from the UNTIL hash, to mark that such domains have already reached
+ their UNTIL condition.
+ */
+ for (i= 0; i < glev->count; ++i)
{
- /*
- The slave requested to start from the very beginning of this
- domain in this binlog file. So delete the entry from the state,
- we do not need to skip anything.
- */
- state->remove(gtid);
+ const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
+ if (!gtid)
+ {
+ /*
+ Contains_all_slave_gtid() returns false if there is any domain in
+ Gtid_list_event which is not in the requested slave position.
+
+ We may delete a domain from the slave state inside this loop, but
+ we only do this when it is the very last GTID logged for that
+ domain in earlier binlogs, and then we can not encounter it in any
+ further GTIDs in the Gtid_list.
+ */
+ DBUG_ASSERT(0);
+ } else if (gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no == glev->list[i].seq_no)
+ {
+ /*
+ The slave requested to start from the very beginning of this
+ domain in this binlog file. So delete the entry from the state,
+ we do not need to skip anything.
+ */
+ state->remove(gtid);
+ }
+
+ if (until_gtid_state &&
+ (gtid= until_gtid_state->find(glev->list[i].domain_id)) &&
+ gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no <= glev->list[i].seq_no)
+ {
+ /*
+ We've already reached the stop position in UNTIL for this domain,
+ since it is before the start position.
+ */
+ until_gtid_state->remove(gtid);
+ }
}
}
@@ -1163,6 +1313,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
goto end;
}
status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
+ current_checksum_alg,
&gtid_list, &list_len);
if (status)
{
@@ -1256,6 +1407,49 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
}
+static bool
+is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
+ enum_gtid_until_state gtid_until_group,
+ Log_event_type event_type, uint8 current_checksum_alg,
+ ushort flags, const char **errmsg,
+ rpl_binlog_state *until_binlog_state)
+{
+ switch (gtid_until_group)
+ {
+ case GTID_UNTIL_NOT_DONE:
+ return false;
+ case GTID_UNTIL_STOP_AFTER_STANDALONE:
+ if (Log_event::is_part_of_group(event_type))
+ return false;
+ break;
+ case GTID_UNTIL_STOP_AFTER_TRANSACTION:
+ if (event_type != XID_EVENT &&
+ (event_type != QUERY_EVENT ||
+ !Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset,
+ packet->length()-*ev_offset,
+ current_checksum_alg)))
+ return false;
+ break;
+ }
+
+ /*
+ The last event group has been sent, now the START SLAVE UNTIL condition
+ has been reached.
+
+ Send a last fake Gtid_list_log_event with a flag set to mark that we
+ stop due to UNTIL condition.
+ */
+ if (reset_transmit_packet(thd, flags, ev_offset, errmsg))
+ return true;
+ Gtid_list_log_event glev(until_binlog_state,
+ Gtid_list_log_event::FLAG_UNTIL_REACHED);
+ if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg))
+ return true;
+ *errmsg= NULL;
+ return true;
+}
+
+
/*
Helper function for mysql_binlog_send() to write an event down the slave
connection.
@@ -1268,37 +1462,113 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
IO_CACHE *log, int mariadb_slave_capability,
ulong ev_offset, uint8 current_checksum_alg,
bool using_gtid_state, slave_connection_state *gtid_state,
- enum_gtid_skip_type *gtid_skip_group)
+ enum_gtid_skip_type *gtid_skip_group,
+ slave_connection_state *until_gtid_state,
+ enum_gtid_until_state *gtid_until_group,
+ rpl_binlog_state *until_binlog_state)
{
my_off_t pos;
size_t len= packet->length();
+ if (event_type == GTID_LIST_EVENT && using_gtid_state &&
+ (gtid_state->count() > 0 || until_gtid_state))
+ {
+ rpl_gtid *gtid_list;
+ uint32 list_len;
+ bool err;
+
+ if (ev_offset > len ||
+ Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+ current_checksum_alg,
+ &gtid_list, &list_len))
+ return "Failed to read Gtid_list_log_event: corrupt binlog";
+ err= until_binlog_state->load(gtid_list, list_len);
+ my_free(gtid_list);
+ if (err)
+ return "Failed in internal GTID book-keeping: Out of memory";
+ }
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
- if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0)
+ if (event_type == GTID_EVENT && using_gtid_state)
{
- uint32 server_id, domain_id;
- uint64 seq_no;
uchar flags2;
rpl_gtid *gtid;
- if (ev_offset > len ||
- Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
- current_checksum_alg,
- &domain_id, &server_id, &seq_no, &flags2))
- return "Failed to read Gtid_log_event: corrupt binlog";
- gtid= gtid_state->find(domain_id);
- if (gtid != NULL)
+ if (gtid_state->count() > 0 || until_gtid_state)
{
- /* Skip this event group if we have not yet reached slave start pos. */
- if (server_id != gtid->server_id || seq_no <= gtid->seq_no)
- *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
- GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
- /*
- Delete this entry if we have reached slave start position (so we will
- not skip subsequent events and won't have to look them up and check).
- */
- if (server_id == gtid->server_id && seq_no >= gtid->seq_no)
- gtid_state->remove(gtid);
+ rpl_gtid event_gtid;
+
+ if (ev_offset > len ||
+ Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+ current_checksum_alg,
+ &event_gtid.domain_id, &event_gtid.server_id,
+ &event_gtid.seq_no, &flags2))
+ return "Failed to read Gtid_log_event: corrupt binlog";
+
+ if (until_binlog_state->update(&event_gtid))
+ return "Failed in internal GTID book-keeping: Out of memory";
+
+ if (gtid_state->count() > 0)
+ {
+ gtid= gtid_state->find(event_gtid.domain_id);
+ if (gtid != NULL)
+ {
+ /* Skip this event group if we have not yet reached slave start pos. */
+ if (event_gtid.server_id != gtid->server_id ||
+ event_gtid.seq_no <= gtid->seq_no)
+ *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ /*
+ Delete this entry if we have reached slave start position (so we will
+ not skip subsequent events and won't have to look them up and check).
+ */
+ if (event_gtid.server_id == gtid->server_id &&
+ event_gtid.seq_no >= gtid->seq_no)
+ gtid_state->remove(gtid);
+ }
+ }
+
+ if (until_gtid_state)
+ {
+ gtid= until_gtid_state->find(event_gtid.domain_id);
+ if (gtid == NULL)
+ {
+ /*
+ This domain already reached the START SLAVE UNTIL stop condition,
+ so skip this event group.
+ */
+ *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ else if (event_gtid.server_id == gtid->server_id &&
+ event_gtid.seq_no >= gtid->seq_no)
+ {
+ /*
+ We have reached the stop condition.
+ Delete this domain_id from the hash, so we will skip all further
+ events in this domain and eventually stop when all domains are
+ done.
+ */
+ uint64 until_seq_no= gtid->seq_no;
+ until_gtid_state->remove(gtid);
+ if (until_gtid_state->count() == 0)
+ *gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_UNTIL_STOP_AFTER_STANDALONE :
+ GTID_UNTIL_STOP_AFTER_TRANSACTION);
+ if (event_gtid.seq_no > until_seq_no)
+ {
+ /*
+ The GTID in START SLAVE UNTIL condition is missing in our binlog.
+ This should normally not happen (user error), but since we can be
+ sure that we are now beyond the position that the UNTIL condition
+ should be in, we can just stop now. And we also need to skip this
+ event group (as it is beyond the UNTIL condition).
+ */
+ *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ }
+ }
}
}
@@ -1446,6 +1716,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
return NULL; /* Success */
}
+
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
ushort flags)
{
@@ -1465,12 +1736,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
int mariadb_slave_capability;
- char str_buf[256];
+ char str_buf[128];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
bool using_gtid_state;
- slave_connection_state gtid_state, return_gtid_state;
+ char str_buf2[128];
+ String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
+ slave_connection_state gtid_state, until_gtid_state_obj;
+ slave_connection_state *until_gtid_state= NULL;
rpl_gtid error_gtid;
enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
+ enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE;
+ rpl_binlog_state until_binlog_state;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
@@ -1502,6 +1778,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
connect_gtid_state.length(0);
using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
+ if (using_gtid_state &&
+ get_slave_until_gtid(thd, &slave_until_gtid_str))
+ until_gtid_state= &until_gtid_state_obj;
+
/*
We want to corrupt the first event, in Log_event::read_log_event().
But we do not want the corruption to happen early, eg. when client does
@@ -1557,13 +1837,23 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
+ if (until_gtid_state &&
+ until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
+ slave_until_gtid_str.length()))
+ {
+ errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
+ "position sent from slave";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
- &error_gtid)))
+ &error_gtid, until_gtid_state)))
{
my_errno= error;
goto err;
}
- if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name)))
+ if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name,
+ until_gtid_state)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -1753,6 +2043,15 @@ impossible position";
/* The Format_description_log_event event will be found naturally. */
}
+ /*
+ Handle the case of START SLAVE UNTIL with an UNTIL condition already
+ fulfilled at the start position.
+
+ We will send one event, the format_description, and then stop.
+ */
+ if (until_gtid_state && until_gtid_state->count() == 0)
+ gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+
/* seek to the requested position, to start the requested dump */
my_b_seek(&log, pos); // Seek will done on next read
@@ -1833,12 +2132,26 @@ impossible position";
log_file_name, &log,
mariadb_slave_capability, ev_offset,
current_checksum_alg, using_gtid_state,
- &gtid_state, &gtid_skip_group)))
+ &gtid_state, &gtid_skip_group,
+ until_gtid_state, &gtid_until_group,
+ &until_binlog_state)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
+ if (until_gtid_state &&
+ is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
+ event_type, current_checksum_alg, flags, &errmsg,
+ &until_binlog_state))
+ {
+ if (errmsg)
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ goto end;
+ }
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
@@ -1992,18 +2305,34 @@ impossible position";
goto err;
}
- if (read_packet &&
- (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log,
- mariadb_slave_capability, ev_offset,
- current_checksum_alg,
- using_gtid_state, &gtid_state,
- &gtid_skip_group)))
+ if (read_packet)
{
- errmsg= tmp_msg;
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
+ log_file_name, &log,
+ mariadb_slave_capability, ev_offset,
+ current_checksum_alg,
+ using_gtid_state, &gtid_state,
+ &gtid_skip_group, until_gtid_state,
+ &gtid_until_group, &until_binlog_state)))
+ {
+ errmsg= tmp_msg;
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ if (
+ until_gtid_state &&
+ is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
+ event_type, current_checksum_alg, flags, &errmsg,
+ &until_binlog_state))
+ {
+ if (errmsg)
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ goto end;
+ }
+ }
log.error=0;
}
@@ -2167,6 +2496,26 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
lock_slave_threads(mi); // this allows us to cleanly read slave_running
// Get a mask of _stopped_ threads
init_thread_mask(&thread_mask,mi,1 /* inverse */);
+
+ if (thd->lex->mi.gtid_pos_str.str)
+ {
+ if (thread_mask != (SLAVE_IO|SLAVE_SQL))
+ {
+ slave_errno= ER_SLAVE_WAS_RUNNING;
+ goto err;
+ }
+ if (thd->lex->slave_thd_opt)
+ {
+ slave_errno= ER_BAD_SLAVE_UNTIL_COND;
+ goto err;
+ }
+ if (!mi->using_gtid)
+ {
+ slave_errno= ER_UNTIL_REQUIRES_USING_GTID;
+ goto err;
+ }
+ }
+
/*
Below we will start all stopped threads. But if the user wants to
start only one thread, do as if the other thread was running (as we
@@ -2213,10 +2562,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
sizeof(mi->rli.until_log_name)-1);
}
+ else if (thd->lex->mi.gtid_pos_str.str)
+ {
+ if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str,
+ thd->lex->mi.gtid_pos_str.length))
+ {
+ slave_errno= ER_INCORRECT_GTID_STATE;
+ mysql_mutex_unlock(&mi->rli.data_lock);
+ goto err;
+ }
+ mi->rli.until_condition= Relay_log_info::UNTIL_GTID;
+ }
else
mi->rli.clear_until_condition();
- if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
+ if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS)
{
/* Preparing members for effective until condition checking */
const char *p= fn_ext(mi->rli.until_log_name);
@@ -2239,7 +2600,10 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
/* mark the cached result of the UNTIL comparison as "undefined" */
mi->rli.until_log_names_cmp_result=
Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
+ }
+ if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
+ {
/* Issuing warning then started without --skip-slave-start */
if (!opt_skip_slave_start)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
@@ -2271,6 +2635,7 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
ER(ER_SLAVE_WAS_RUNNING));
}
+err:
unlock_slave_threads(mi);
if (slave_errno)
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 94fdc46647f..e995b410a85 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -1095,7 +1095,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token LOW_PRIORITY
%token LT /* OPERATOR */
%token MASTER_CONNECT_RETRY_SYM
-%token MASTER_USE_GTID_SYM
+%token MASTER_GTID_POS_SYM
%token MASTER_HOST_SYM
%token MASTER_LOG_FILE_SYM
%token MASTER_LOG_POS_SYM
@@ -1111,6 +1111,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token MASTER_SSL_VERIFY_SERVER_CERT_SYM
%token MASTER_SYM
%token MASTER_USER_SYM
+%token MASTER_USE_GTID_SYM
%token MASTER_HEARTBEAT_PERIOD_SYM
%token MATCH /* SQL-2003-R */
%token MAX_CONNECTIONS_PER_HOUR
@@ -7215,6 +7216,10 @@ slave_until:
MYSQL_YYABORT;
}
}
+ | UNTIL_SYM MASTER_GTID_POS_SYM EQ TEXT_STRING_sys
+ {
+ Lex->mi.gtid_pos_str = $4;
+ }
;
slave_until_opts:
@@ -13326,12 +13331,13 @@ keyword_sp:
| MAX_ROWS {}
| MASTER_SYM {}
| MASTER_HEARTBEAT_PERIOD_SYM {}
- | MASTER_USE_GTID_SYM {}
+ | MASTER_GTID_POS_SYM {}
| MASTER_HOST_SYM {}
| MASTER_PORT_SYM {}
| MASTER_LOG_FILE_SYM {}
| MASTER_LOG_POS_SYM {}
| MASTER_USER_SYM {}
+ | MASTER_USE_GTID_SYM {}
| MASTER_PASSWORD_SYM {}
| MASTER_SERVER_ID_SYM {}
| MASTER_CONNECT_RETRY_SYM {}