summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc797
1 files changed, 467 insertions, 330 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 5af48b6a793..fed171bcc4c 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -52,6 +52,7 @@
#include "log_event.h" // Rotate_log_event,
// Create_file_log_event,
// Format_description_log_event
+#include "wsrep_mysqld.h"
#ifdef HAVE_REPLICATION
@@ -76,6 +77,10 @@ Master_info *active_mi= 0;
Master_info_index *master_info_index;
my_bool replicate_same_server_id;
ulonglong relay_log_space_limit = 0;
+
+const char *relay_log_index= 0;
+const char *relay_log_basename= 0;
+
LEX_STRING default_master_connection_name= { (char*) "", 0 };
/*
@@ -116,7 +121,7 @@ static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
registration on master",
"Reconnecting after a failed registration on master",
"failed registering on master, reconnecting to try again, \
-log '%s' at position %s%s",
+log '%s' at position %llu%s",
"COM_REGISTER_SLAVE",
"Slave I/O thread killed during or after reconnect"
},
@@ -124,7 +129,7 @@ log '%s' at position %s%s",
"Waiting to reconnect after a failed binlog dump request",
"Slave I/O thread killed while retrying master dump",
"Reconnecting after a failed binlog dump request",
- "failed dump request, reconnecting to try again, log '%s' at position %s%s",
+ "failed dump request, reconnecting to try again, log '%s' at position %llu%s",
"COM_BINLOG_DUMP",
"Slave I/O thread killed during or after reconnect"
},
@@ -133,7 +138,7 @@ log '%s' at position %s%s",
"Slave I/O thread killed while waiting to reconnect after a failed read",
"Reconnecting after a failed master event read",
"Slave I/O thread: Failed reading log event, reconnecting to retry, \
-log '%s' at position %s%s",
+log '%s' at position %llu%s",
"",
"Slave I/O thread killed during or after a reconnect done to recover from \
failed read"
@@ -294,6 +299,7 @@ handle_slave_init(void *arg __attribute__((unused)))
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
thd->system_thread = SYSTEM_THREAD_SLAVE_INIT;
+ thread_safe_increment32(&service_thread_count);
thd->store_globals();
thd->security_ctx->skip_grants();
thd->set_command(COM_DAEMON);
@@ -309,6 +315,8 @@ handle_slave_init(void *arg __attribute__((unused)))
mysql_mutex_lock(&LOCK_thread_count);
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
my_thread_end();
mysql_mutex_lock(&LOCK_slave_init);
@@ -425,7 +433,8 @@ int init_slave()
if (active_mi->host[0] && !opt_skip_slave_start)
{
- if (start_slave_threads(1 /* need mutex */,
+ if (start_slave_threads(0, /* No active thd */
+ 1 /* need mutex */,
0 /* no wait for start*/,
active_mi,
master_info_file,
@@ -617,8 +626,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
{
DBUG_PRINT("info",("Terminating SQL thread"));
- if (opt_slave_parallel_threads > 0 &&
- mi->rli.abort_slave && mi->rli.stop_for_until)
+ if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until)
{
mi->rli.stop_for_until= false;
mi->rli.parallel.stop_during_until();
@@ -883,7 +891,8 @@ int start_slave_thread(
started the threads that were not previously running
*/
-int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
+int start_slave_threads(THD *thd,
+ bool need_slave_mutex, bool wait_for_start,
Master_info* mi, const char* master_info_fname,
const char* slave_info_fname, int thread_mask)
{
@@ -929,7 +938,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mi->rli.group_master_log_pos);
strmake(mi->master_log_name, mi->rli.group_master_log_name,
sizeof(mi->master_log_name)-1);
- purge_relay_logs(&mi->rli, NULL, 0, &errmsg);
+ purge_relay_logs(&mi->rli, thd, 0, &errmsg);
mi->rli.group_master_log_pos= mi->master_log_pos;
strmake(mi->rli.group_master_log_name, mi->master_log_name,
sizeof(mi->rli.group_master_log_name)-1);
@@ -1107,14 +1116,14 @@ static bool sql_slave_killed(rpl_group_info *rgi)
else
{
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(),
- ER(ER_SLAVE_FATAL_ERROR), msg_stopped);
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR), msg_stopped);
}
}
else
{
ret= TRUE;
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, rgi->gtid_info(),
- ER(ER_SLAVE_FATAL_ERROR),
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR),
msg_stopped);
}
}
@@ -1201,7 +1210,6 @@ int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
DBUG_RETURN(1);
}
-
/*
when moving these functions to mysys, don't forget to
remove slave.cc from libmysqld/CMakeLists.txt
@@ -1257,6 +1265,7 @@ int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val)
going to ignore (to not log them in the relay log).
Items being read are supposed to be decimal output of values of a
type shorter or equal of @c long and separated by the single space.
+ It also used to restore DO_DOMAIN_IDS & IGNORE_DOMAIN_IDS lists.
@param arr @c DYNAMIC_ARRAY pointer to storage for servers id
@param f @c IO_CACHE pointer to the source file
@@ -1277,7 +1286,7 @@ int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f)
if ((read_size= my_b_gets(f, buf_act, sizeof(buf))) == 0)
{
- return 0; // no line in master.info
+ DBUG_RETURN(0); // no line in master.info
}
if (read_size + 1 == sizeof(buf) && buf[sizeof(buf) - 2] != '\n')
{
@@ -1400,7 +1409,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
"Master reported unrecognized MySQL version: %s",
mysql->server_version);
err_code= ER_SLAVE_FATAL_ERROR;
- sprintf(err_buff, ER(err_code), err_buff2);
+ sprintf(err_buff, ER_DEFAULT(err_code), err_buff2);
}
else
{
@@ -1416,7 +1425,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
"Master reported unrecognized MySQL version: %s",
mysql->server_version);
err_code= ER_SLAVE_FATAL_ERROR;
- sprintf(err_buff, ER(err_code), err_buff2);
+ sprintf(err_buff, ER_DEFAULT(err_code), err_buff2);
break;
case 3:
mi->rli.relay_log.description_event_for_queue= new
@@ -1456,7 +1465,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
{
errmsg= "default Format_description_log_event";
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
- sprintf(err_buff, ER(err_code), errmsg);
+ sprintf(err_buff, ER_DEFAULT(err_code), errmsg);
goto err;
}
@@ -1586,7 +1595,7 @@ MySQL server ids; these ids must be different for replication to work (or \
the --replicate-same-server-id option must be used on slave but this does \
not always make sense; please check the manual before using it).";
err_code= ER_SLAVE_FATAL_ERROR;
- sprintf(err_buff, ER(err_code), errmsg);
+ sprintf(err_buff, ER_DEFAULT(err_code), errmsg);
goto err;
}
}
@@ -1622,7 +1631,7 @@ maybe it is a *VERY OLD MASTER*.");
{
errmsg= "Slave configured with server id filtering could not detect the master server id.";
err_code= ER_SLAVE_FATAL_ERROR;
- sprintf(err_buff, ER(err_code), errmsg);
+ sprintf(err_buff, ER_DEFAULT(err_code), errmsg);
goto err;
}
@@ -1661,7 +1670,7 @@ maybe it is a *VERY OLD MASTER*.");
different values for the COLLATION_SERVER global variable. The values must \
be equal for the Statement-format replication to work";
err_code= ER_SLAVE_FATAL_ERROR;
- sprintf(err_buff, ER(err_code), errmsg);
+ sprintf(err_buff, ER_DEFAULT(err_code), errmsg);
goto err;
}
}
@@ -1724,7 +1733,7 @@ inconsistency if replicated data deals with collation.");
different values for the TIME_ZONE global variable. The values must \
be equal for the Statement-format replication to work";
err_code= ER_SLAVE_FATAL_ERROR;
- sprintf(err_buff, ER(err_code), errmsg);
+ sprintf(err_buff, ER_DEFAULT(err_code), errmsg);
goto err;
}
}
@@ -1763,14 +1772,13 @@ when it try to get the value of TIME_ZONE global variable from master.";
if (mi->heartbeat_period != 0.0)
{
- char llbuf[22];
- const char query_format[]= "SET @master_heartbeat_period= %s";
- char query[sizeof(query_format) - 2 + sizeof(llbuf)];
+ const char query_format[]= "SET @master_heartbeat_period= %llu";
+ char query[sizeof(query_format) + 32];
/*
the period is an ulonglong of nano-secs.
*/
- llstr((ulonglong) (mi->heartbeat_period*1000000000UL), llbuf);
- sprintf(query, query_format, llbuf);
+ my_snprintf(query, sizeof(query), query_format,
+ (ulonglong) (mi->heartbeat_period*1000000000UL));
DBUG_EXECUTE_IF("simulate_slave_heartbeat_network_error",
{ static ulong dbug_count= 0;
@@ -1872,8 +1880,8 @@ when it try to get the value of TIME_ZONE global variable from master.";
(master_row= mysql_fetch_row(master_res)) &&
(master_row[0] != NULL))
{
- mi->checksum_alg_before_fd= (uint8)
- find_type(master_row[0], &binlog_checksum_typelib, 1) - 1;
+ mi->checksum_alg_before_fd= (enum_binlog_checksum_alg)
+ (find_type(master_row[0], &binlog_checksum_typelib, 1) - 1);
// valid outcome is either of
DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF ||
mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32);
@@ -2289,13 +2297,10 @@ static bool wait_for_relay_log_space(Relay_log_info* rli)
{
#ifndef DBUG_OFF
{
- char llbuf1[22], llbuf2[22];
- DBUG_PRINT("info", ("log_space_limit=%s "
- "log_space_total=%s "
+ DBUG_PRINT("info", ("log_space_limit=%llu log_space_total=%llu "
"ignore_log_space_limit=%d "
"sql_force_rotate_relay=%d",
- llstr(rli->log_space_limit,llbuf1),
- llstr(rli->log_space_total,llbuf2),
+ rli->log_space_limit, rli->log_space_total,
(int) rli->ignore_log_space_limit,
(int) rli->sql_force_rotate_relay));
}
@@ -2346,7 +2351,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
rli->ign_master_log_name_end[0]= 0;
if (unlikely(!(bool)rev))
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, NULL,
- ER(ER_SLAVE_CREATE_EVENT_FAILURE),
+ ER_THD(thd, ER_SLAVE_CREATE_EVENT_FAILURE),
"Rotate_event (out of memory?),"
" SHOW SLAVE STATUS may be inaccurate");
}
@@ -2357,7 +2362,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
rli->ign_gtids.reset();
if (unlikely(!(bool)glev))
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE, NULL,
- ER(ER_SLAVE_CREATE_EVENT_FAILURE),
+ ER_THD(thd, ER_SLAVE_CREATE_EVENT_FAILURE),
"Gtid_list_event (out of memory?),"
" gtid_slave_pos may be inaccurate");
}
@@ -2370,7 +2375,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
rev->server_id= 0; // don't be ignored by slave SQL thread
if (unlikely(rli->relay_log.append(rev)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
- ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
+ ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Rotate event"
" to the relay log, SHOW SLAVE STATUS may be"
" inaccurate");
@@ -2383,7 +2388,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
if (unlikely(rli->relay_log.append(glev)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
- ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
+ ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Gtid_list event to the relay log, "
"gtid_slave_pos may be inaccurate");
delete glev;
@@ -2509,101 +2514,212 @@ static bool send_show_master_info_header(THD *thd, bool full,
List<Item> field_list;
Protocol *protocol= thd->protocol;
Master_info *mi;
+ MEM_ROOT *mem_root= thd->mem_root;
DBUG_ENTER("show_master_info_header");
if (full)
{
- field_list.push_back(new Item_empty_string("Connection_name",
- MAX_CONNECTION_NAME));
- field_list.push_back(new Item_empty_string("Slave_SQL_State",
- 30));
- }
-
- field_list.push_back(new Item_empty_string("Slave_IO_State",
- 30));
- field_list.push_back(new Item_empty_string("Master_Host",
- sizeof(mi->host)));
- field_list.push_back(new Item_empty_string("Master_User",
- sizeof(mi->user)));
- field_list.push_back(new Item_return_int("Master_Port", 7,
- MYSQL_TYPE_LONG));
- field_list.push_back(new Item_return_int("Connect_Retry", 10,
- MYSQL_TYPE_LONG));
- field_list.push_back(new Item_empty_string("Master_Log_File",
- FN_REFLEN));
- field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
- MYSQL_TYPE_LONGLONG));
- field_list.push_back(new Item_empty_string("Relay_Log_File",
- FN_REFLEN));
- field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
- MYSQL_TYPE_LONGLONG));
- field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
- FN_REFLEN));
- field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
- field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
- field_list.push_back(new Item_empty_string("Replicate_Do_DB", 20));
- field_list.push_back(new Item_empty_string("Replicate_Ignore_DB", 20));
- field_list.push_back(new Item_empty_string("Replicate_Do_Table", 20));
- field_list.push_back(new Item_empty_string("Replicate_Ignore_Table", 23));
- field_list.push_back(new Item_empty_string("Replicate_Wild_Do_Table", 24));
- field_list.push_back(new Item_empty_string("Replicate_Wild_Ignore_Table",
- 28));
- field_list.push_back(new Item_return_int("Last_Errno", 4, MYSQL_TYPE_LONG));
- field_list.push_back(new Item_empty_string("Last_Error", 20));
- field_list.push_back(new Item_return_int("Skip_Counter", 10,
- MYSQL_TYPE_LONG));
- field_list.push_back(new Item_return_int("Exec_Master_Log_Pos", 10,
- MYSQL_TYPE_LONGLONG));
- field_list.push_back(new Item_return_int("Relay_Log_Space", 10,
- MYSQL_TYPE_LONGLONG));
- field_list.push_back(new Item_empty_string("Until_Condition", 6));
- field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
- field_list.push_back(new Item_return_int("Until_Log_Pos", 10,
- MYSQL_TYPE_LONGLONG));
- field_list.push_back(new Item_empty_string("Master_SSL_Allowed", 7));
- field_list.push_back(new Item_empty_string("Master_SSL_CA_File",
- sizeof(mi->ssl_ca)));
- field_list.push_back(new Item_empty_string("Master_SSL_CA_Path",
- sizeof(mi->ssl_capath)));
- field_list.push_back(new Item_empty_string("Master_SSL_Cert",
- sizeof(mi->ssl_cert)));
- field_list.push_back(new Item_empty_string("Master_SSL_Cipher",
- sizeof(mi->ssl_cipher)));
- field_list.push_back(new Item_empty_string("Master_SSL_Key",
- sizeof(mi->ssl_key)));
- field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
- MYSQL_TYPE_LONGLONG));
- field_list.push_back(new Item_empty_string("Master_SSL_Verify_Server_Cert",
- 3));
- field_list.push_back(new Item_return_int("Last_IO_Errno", 4, MYSQL_TYPE_LONG));
- field_list.push_back(new Item_empty_string("Last_IO_Error", 20));
- field_list.push_back(new Item_return_int("Last_SQL_Errno", 4, MYSQL_TYPE_LONG));
- field_list.push_back(new Item_empty_string("Last_SQL_Error", 20));
- field_list.push_back(new Item_empty_string("Replicate_Ignore_Server_Ids",
- FN_REFLEN));
- field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong),
- MYSQL_TYPE_LONG));
- field_list.push_back(new Item_empty_string("Master_SSL_Crl",
- sizeof(mi->ssl_crl)));
- field_list.push_back(new Item_empty_string("Master_SSL_Crlpath",
- sizeof(mi->ssl_crlpath)));
- field_list.push_back(new Item_empty_string("Using_Gtid",
- sizeof("Current_Pos")-1));
- field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30));
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Connection_name",
+ MAX_CONNECTION_NAME),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Slave_SQL_State", 30),
+ thd->mem_root);
+ }
+
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Slave_IO_State", 30),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_Host", sizeof(mi->host)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_User", sizeof(mi->user)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Master_Port", 7, MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Connect_Retry", 10,
+ MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_Log_File", FN_REFLEN),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Read_Master_Log_Pos", 10,
+ MYSQL_TYPE_LONGLONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Relay_Log_File", FN_REFLEN),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Relay_Log_Pos", 10,
+ MYSQL_TYPE_LONGLONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Relay_Master_Log_File",
+ FN_REFLEN),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Slave_IO_Running", 3),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Slave_SQL_Running", 3),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Do_DB", 20),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Ignore_DB", 20),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Do_Table", 20),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Ignore_Table", 23),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Wild_Do_Table", 24),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Wild_Ignore_Table",
+ 28),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Last_Errno", 4, MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Last_Error", 20),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Skip_Counter", 10,
+ MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Exec_Master_Log_Pos", 10,
+ MYSQL_TYPE_LONGLONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Relay_Log_Space", 10,
+ MYSQL_TYPE_LONGLONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Until_Condition", 6),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Until_Log_File", FN_REFLEN),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Until_Log_Pos", 10,
+ MYSQL_TYPE_LONGLONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_Allowed", 7),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_CA_File",
+ sizeof(mi->ssl_ca)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_CA_Path",
+ sizeof(mi->ssl_capath)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_Cert",
+ sizeof(mi->ssl_cert)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_Cipher",
+ sizeof(mi->ssl_cipher)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_Key",
+ sizeof(mi->ssl_key)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Seconds_Behind_Master", 10,
+ MYSQL_TYPE_LONGLONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_Verify_Server_Cert",
+ 3),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Last_IO_Errno", 4,
+ MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Last_IO_Error", 20),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Last_SQL_Errno", 4,
+ MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Last_SQL_Error", 20),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Ignore_Server_Ids",
+ FN_REFLEN),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Master_Server_Id", sizeof(ulong),
+ MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_Crl",
+ sizeof(mi->ssl_crl)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Master_SSL_Crlpath",
+ sizeof(mi->ssl_crlpath)),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Using_Gtid",
+ sizeof("Current_Pos")-1),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Gtid_IO_Pos", 30),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Do_Domain_Ids",
+ FN_REFLEN),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Replicate_Ignore_Domain_Ids",
+ FN_REFLEN),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Parallel_Mode",
+ sizeof("conservative")-1),
+ thd->mem_root);
if (full)
{
- field_list.push_back(new Item_return_int("Retried_transactions",
- 10, MYSQL_TYPE_LONG));
- field_list.push_back(new Item_return_int("Max_relay_log_size",
- 10, MYSQL_TYPE_LONGLONG));
- field_list.push_back(new Item_return_int("Executed_log_entries",
- 10, MYSQL_TYPE_LONG));
- field_list.push_back(new Item_return_int("Slave_received_heartbeats",
- 10, MYSQL_TYPE_LONG));
- field_list.push_back(new Item_float("Slave_heartbeat_period",
- 0.0, 3, 10));
- field_list.push_back(new Item_empty_string("Gtid_Slave_Pos",
- gtid_pos_length));
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Retried_transactions", 10,
+ MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Max_relay_log_size", 10,
+ MYSQL_TYPE_LONGLONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Executed_log_entries", 10,
+ MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_return_int(thd, "Slave_received_heartbeats", 10,
+ MYSQL_TYPE_LONG),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_float(thd, "Slave_heartbeat_period", 0.0, 3, 10),
+ thd->mem_root);
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "Gtid_Slave_Pos",
+ gtid_pos_length),
+ thd->mem_root);
}
if (protocol->send_result_set_metadata(&field_list,
@@ -2624,8 +2740,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
String *packet= &thd->packet;
Protocol *protocol= thd->protocol;
Rpl_filter *rpl_filter= mi->rpl_filter;
- char buf[256];
- String tmp(buf, sizeof(buf), &my_charset_bin);
+ StringBuffer<256> tmp;
protocol->prepare_for_resend();
@@ -2722,8 +2837,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
else
{
idle= mi->rli.sql_thread_caught_up;
- if (opt_slave_parallel_threads > 0 && idle &&
- !mi->rli.parallel.workers_idle())
+ if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle())
idle= false;
}
if (idle)
@@ -2771,42 +2885,31 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
// Last_SQL_Error
protocol->store(mi->rli.last_error().message, &my_charset_bin);
// Replicate_Ignore_Server_Ids
- {
- char buff[FN_REFLEN];
- ulong i, cur_len;
- for (i= 0, buff[0]= 0, cur_len= 0;
- i < mi->ignore_server_ids.elements; i++)
- {
- ulong s_id, slen;
- char sbuff[FN_REFLEN];
- get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i);
- slen= sprintf(sbuff, (i==0? "%lu" : ", %lu"), s_id);
- if (cur_len + slen + 4 > FN_REFLEN)
- {
- /*
- break the loop whenever remained space could not fit
- ellipses on the next cycle
- */
- sprintf(buff + cur_len, "...");
- break;
- }
- cur_len += sprintf(buff + cur_len, "%s", sbuff);
- }
- protocol->store(buff, &my_charset_bin);
- }
+ prot_store_ids(thd, &mi->ignore_server_ids);
// Master_Server_id
protocol->store((uint32) mi->master_id);
// Master_Ssl_Crl
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
protocol->store(mi->ssl_capath, &my_charset_bin);
+ // Using_Gtid
protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin);
+ // Gtid_IO_Pos
{
- char buff[30];
- String tmp(buff, sizeof(buff), system_charset_info);
mi->gtid_current_pos.to_string(&tmp);
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
}
+
+ // Replicate_Do_Domain_Ids & Replicate_Ignore_Domain_Ids
+ mi->domain_id_filter.store_ids(thd);
+
+ // Parallel_Mode
+ {
+ const char *mode_name= get_type(&slave_parallel_mode_typelib,
+ mi->parallel_mode);
+ protocol->store(mode_name, strlen(mode_name), &my_charset_bin);
+ }
+
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -2953,21 +3056,24 @@ static int init_slave_thread(THD* thd, Master_info *mi,
simulate_error|= (1 << SLAVE_THD_IO););
DBUG_EXECUTE_IF("simulate_sql_slave_error_on_init",
simulate_error|= (1 << SLAVE_THD_SQL););
+
+ thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
+ SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
+ thread_safe_increment32(&service_thread_count);
+
/* We must call store_globals() before doing my_net_init() */
if (init_thr_lock() || thd->store_globals() ||
- my_net_init(&thd->net, 0, MYF(MY_THREAD_SPECIFIC)) ||
+ my_net_init(&thd->net, 0, thd, MYF(MY_THREAD_SPECIFIC)) ||
IF_DBUG(simulate_error & (1<< thd_type), 0))
{
thd->cleanup();
DBUG_RETURN(-1);
}
- thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
- SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
thd->security_ctx->skip_grants();
thd->slave_thread= 1;
thd->connection_name= mi->connection_name;
- thd->enable_slow_log= opt_log_slow_slave_statements;
+ thd->variables.sql_log_slow= opt_log_slow_slave_statements;
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
set_slave_thread_options(thd);
thd->client_capabilities = CLIENT_LOCAL_FILES;
@@ -3135,9 +3241,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
/*
Check if the current error is of temporary nature of not.
Some errors are temporary in nature, such as
- ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
- that the error is temporary by pushing a warning with the error code
- ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
+ ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT.
*/
int
has_temporary_error(THD *thd)
@@ -3168,25 +3272,6 @@ has_temporary_error(THD *thd)
thd->get_stmt_da()->sql_errno() == ER_LOCK_WAIT_TIMEOUT)
DBUG_RETURN(1);
-#ifdef HAVE_NDB_BINLOG
- /*
- currently temporary error set in ndbcluster
- */
- List_iterator_fast<Sql_condition> it(thd->warning_info->warn_list());
- Sql_condition *err;
- while ((err= it++))
- {
- DBUG_PRINT("info", ("has condition %d %s", err->get_sql_errno(),
- err->get_message_text()));
- switch (err->get_sql_errno())
- {
- case ER_GET_TEMPORARY_ERRMSG:
- DBUG_RETURN(1);
- default:
- break;
- }
- }
-#endif
DBUG_RETURN(0);
}
@@ -3319,15 +3404,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
if (!rli->is_fake)
#endif
{
-#ifndef DBUG_OFF
- char buf[22];
-#endif
DBUG_PRINT("info", ("update_pos error = %d", error));
- DBUG_PRINT("info", ("group %s %s",
- llstr(rli->group_relay_log_pos, buf),
+ DBUG_PRINT("info", ("group %llu %s", rli->group_relay_log_pos,
rli->group_relay_log_name));
- DBUG_PRINT("info", ("event %s %s",
- llstr(rli->event_relay_log_pos, buf),
+ DBUG_PRINT("info", ("event %llu %s", rli->event_relay_log_pos,
rli->event_relay_log_name));
}
/*
@@ -3339,14 +3419,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
*/
if (error)
{
- char buf[22];
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR, rgi->gtid_info(),
"It was not possible to update the positions"
" of the relay log information: the slave may"
" be in an inconsistent state."
- " Stopped in %s position %s",
- rli->group_relay_log_name,
- llstr(rli->group_relay_log_pos, buf));
+ " Stopped in %s position %llu",
+ rli->group_relay_log_name, rli->group_relay_log_pos);
DBUG_RETURN(2);
}
}
@@ -3513,11 +3591,14 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/
if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
- rli->is_until_satisfied(thd, ev))
+ (ev->server_id != global_system_variables.server_id ||
+ rli->replicate_same_server_id) &&
+ rli->is_until_satisfied((rli->get_flag(Relay_log_info::IN_TRANSACTION) || !ev->log_pos)
+ ? rli->group_master_log_pos
+ : ev->log_pos - ev->data_written))
{
- char buf[22];
sql_print_information("Slave SQL thread stopped because it reached its"
- " UNTIL position %s", llstr(rli->until_pos(), buf));
+ " UNTIL position %llu", rli->until_pos());
/*
Setting abort_slave flag because we do not want additional
message about error in query execution to be printed.
@@ -3551,7 +3632,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
update_state_of_relay_log(rli, ev);
- if (opt_slave_parallel_threads > 0)
+ if (rli->mi->using_parallel())
{
int res= rli->parallel.do_event(serial_rgi, ev, event_size);
if (res >= 0)
@@ -3618,8 +3699,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
if (slave_trans_retries)
{
- int temp_err;
- LINT_INIT(temp_err);
+ int UNINIT_VAR(temp_err);
if (exec_res && (temp_err= has_temporary_error(thd)))
{
const char *errmsg;
@@ -3689,13 +3769,12 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
serial_rgi->trans_retries));
}
}
- thread_safe_increment64(&rli->executed_entries,
- &slave_executed_entries_lock);
+ thread_safe_increment64(&rli->executed_entries);
DBUG_RETURN(exec_res);
}
mysql_mutex_unlock(&rli->data_lock);
rli->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_READ_FAILURE, NULL,
- ER(ER_SLAVE_RELAY_LOG_READ_FAILURE), "\
+ ER_THD(thd, ER_SLAVE_RELAY_LOG_READ_FAILURE), "\
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
binary log), the slave's relay log is corrupted (you can check this by running \
@@ -3766,8 +3845,8 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
thd->proc_info = messages[SLAVE_RECON_MSG_AFTER];
if (!suppress_warnings)
{
- char buf[256], llbuff[22];
- String tmp;
+ char buf[256];
+ StringBuffer<100> tmp;
if (mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
@@ -3781,7 +3860,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
}
}
my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED],
- IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff),
+ IO_RPL_LOG_NAME, mi->master_log_pos,
tmp.c_ptr_safe());
/*
Raise a warining during registering on master/requesting dump.
@@ -3790,7 +3869,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
if (messages[SLAVE_RECON_MSG_COMMAND][0])
{
mi->report(WARNING_LEVEL, ER_SLAVE_MASTER_COM_FAILURE, NULL,
- ER(ER_SLAVE_MASTER_COM_FAILURE),
+ ER_THD(thd, ER_SLAVE_MASTER_COM_FAILURE),
messages[SLAVE_RECON_MSG_COMMAND], buf);
}
else
@@ -3822,13 +3901,12 @@ pthread_handler_t handle_slave_io(void *arg)
MYSQL *mysql;
Master_info *mi = (Master_info*)arg;
Relay_log_info *rli= &mi->rli;
- char llbuff[22];
uint retry_count;
bool suppress_warnings;
int ret;
rpl_io_thread_info io_info;
#ifndef DBUG_OFF
- uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
+ mi->dbug_do_disconnect= false;
#endif
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
@@ -3869,9 +3947,8 @@ pthread_handler_t handle_slave_io(void *arg)
mysql_mutex_unlock(&mi->run_lock);
mysql_cond_broadcast(&mi->start_cond);
- DBUG_PRINT("master_info",("log_file_name: '%s' position: %s",
- mi->master_log_name,
- llstr(mi->master_log_pos,llbuff)));
+ DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu",
+ mi->master_log_name, mi->master_log_pos));
/* This must be called before run any binlog_relay_io hooks */
my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
@@ -3896,14 +3973,15 @@ pthread_handler_t handle_slave_io(void *arg)
if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
- ER(ER_SLAVE_FATAL_ERROR), "Failed to run 'thread_start' hook");
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR),
+ "Failed to run 'thread_start' hook");
goto err;
}
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
- ER(ER_SLAVE_FATAL_ERROR), "error in mysql_init()");
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR), "error in mysql_init()");
goto err;
}
@@ -3913,13 +3991,12 @@ pthread_handler_t handle_slave_io(void *arg)
{
if (mi->using_gtid == Master_info::USE_GTID_NO)
sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
- "replication started in log '%s' at position %s",
+ "replication started in log '%s' at position %llu",
mi->user, mi->host, mi->port,
- IO_RPL_LOG_NAME,
- llstr(mi->master_log_pos,llbuff));
+ IO_RPL_LOG_NAME, mi->master_log_pos);
else
{
- String tmp;
+ StringBuffer<100> tmp;
mi->gtid_current_pos.to_string(&tmp);
sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',"
"replication starts at GTID position '%s'",
@@ -3976,7 +4053,7 @@ connected:
if (ret == 2)
{
- if (check_io_slave_killed(mi, "Slave I/O thread killed"
+ if (check_io_slave_killed(mi, "Slave I/O thread killed "
"while calling get_master_version_and_clock(...)"))
goto err;
suppress_warnings= FALSE;
@@ -4010,16 +4087,6 @@ connected:
goto err;
goto connected;
}
- DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_REG",
- if (!retry_count_reg)
- {
- retry_count_reg++;
- sql_print_information("Forcing to reconnect slave I/O thread");
- if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
- reconnect_messages[SLAVE_RECON_ACT_REG]))
- goto err;
- goto connected;
- });
}
DBUG_PRINT("info",("Starting reading binary log from master"));
@@ -4036,16 +4103,7 @@ requesting master dump") ||
goto err;
goto connected;
}
- DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_DUMP",
- if (!retry_count_dump)
- {
- retry_count_dump++;
- sql_print_information("Forcing to reconnect slave I/O thread");
- if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
- reconnect_messages[SLAVE_RECON_ACT_DUMP]))
- goto err;
- goto connected;
- });
+
const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0);
@@ -4063,16 +4121,6 @@ requesting master dump") ||
if (check_io_slave_killed(mi, "Slave I/O thread killed while \
reading event"))
goto err;
- DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT",
- if (!retry_count_event)
- {
- retry_count_event++;
- sql_print_information("Forcing to reconnect slave I/O thread");
- if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
- reconnect_messages[SLAVE_RECON_ACT_EVENT]))
- goto err;
- goto connected;
- });
if (event_len == packet_error)
{
@@ -4089,14 +4137,14 @@ slave_max_allowed_packet",
goto err;
case ER_MASTER_FATAL_ERROR_READING_BINLOG:
mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG, NULL,
- ER(ER_MASTER_FATAL_ERROR_READING_BINLOG),
+ ER_THD(thd, ER_MASTER_FATAL_ERROR_READING_BINLOG),
mysql_error_number, mysql_error(mysql));
goto err;
case ER_OUT_OF_RESOURCES:
sql_print_error("\
Stopping slave I/O thread due to out-of-memory error from master");
mi->report(ERROR_LEVEL, ER_OUT_OF_RESOURCES, NULL,
- "%s", ER(ER_OUT_OF_RESOURCES));
+ "%s", ER_THD(thd, ER_OUT_OF_RESOURCES));
goto err;
}
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
@@ -4113,7 +4161,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
event_len, &event_buf, &event_len)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
- ER(ER_SLAVE_FATAL_ERROR),
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR),
"Failed to run 'after_read_event' hook");
goto err;
}
@@ -4124,7 +4172,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
- ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
+ ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"could not queue event from master");
goto err;
}
@@ -4133,7 +4181,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
(thd, mi, event_buf, event_len, synced)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
- ER(ER_SLAVE_FATAL_ERROR),
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR),
"Failed to run 'after_queue_event' hook");
goto err;
}
@@ -4159,11 +4207,9 @@ Stopping slave I/O thread due to out-of-memory error from master");
*/
#ifndef DBUG_OFF
{
- char llbuf1[22], llbuf2[22];
- DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
-ignore_log_space_limit=%d",
- llstr(rli->log_space_limit,llbuf1),
- llstr(rli->log_space_total,llbuf2),
+ DBUG_PRINT("info", ("log_space_limit=%llu log_space_total=%llu "
+ "ignore_log_space_limit=%d",
+ rli->log_space_limit, rli->log_space_total,
(int) rli->ignore_log_space_limit));
}
#endif
@@ -4185,15 +4231,14 @@ err:
// print the current replication position
if (mi->using_gtid == Master_info::USE_GTID_NO)
sql_print_information("Slave I/O thread exiting, read up to log '%s', "
- "position %s",
- IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
+ "position %llu", IO_RPL_LOG_NAME, mi->master_log_pos);
else
{
- String tmp;
+ StringBuffer<100> tmp;
mi->gtid_current_pos.to_string(&tmp);
sql_print_information("Slave I/O thread exiting, read up to log '%s', "
- "position %s; GTID position %s",
- IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff),
+ "position %llu; GTID position %s",
+ IO_RPL_LOG_NAME, mi->master_log_pos,
tmp.c_ptr_safe());
}
RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
@@ -4228,11 +4273,14 @@ err_during_init:
mi->rli.relay_log.description_event_for_queue= 0;
// TODO: make rpl_status part of Master_info
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
+
mysql_mutex_lock(&LOCK_thread_count);
thd->unlink();
mysql_mutex_unlock(&LOCK_thread_count);
- THD_CHECK_SENTRY(thd);
delete thd;
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
+
mi->abort_slave= 0;
mi->slave_running= MYSQL_SLAVE_NOT_RUN;
mi->io_thd= 0;
@@ -4330,7 +4378,6 @@ slave_output_error_info(rpl_group_info *rgi, THD *thd)
*/
Relay_log_info *rli= rgi->rli;
uint32 const last_errno= rli->last_error().number;
- char llbuff[22];
if (thd->is_error())
{
@@ -4378,7 +4425,7 @@ slave_output_error_info(rpl_group_info *rgi, THD *thd)
}
if (udf_error)
{
- String tmp;
+ StringBuffer<100> tmp;
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
@@ -4388,22 +4435,22 @@ slave_output_error_info(rpl_group_info *rgi, THD *thd)
sql_print_error("Error loading user-defined library, slave SQL "
"thread aborted. Install the missing library, and restart the "
"slave SQL thread with \"SLAVE START\". We stopped at log '%s' "
- "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,
- llbuff), tmp.c_ptr_safe());
+ "position %llu%s", RPL_LOG_NAME, rli->group_master_log_pos,
+ tmp.c_ptr_safe());
}
else
{
- String tmp;
+ StringBuffer<100> tmp;
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
rpl_append_gtid_state(&tmp, false);
tmp.append(STRING_WITH_LEN("'"));
}
- sql_print_error("\
-Error running query, slave SQL thread aborted. Fix the problem, and restart \
-the slave SQL thread with \"SLAVE START\". We stopped at log \
-'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff),
+ sql_print_error("Error running query, slave SQL thread aborted. "
+ "Fix the problem, and restart the slave SQL thread "
+ "with \"SLAVE START\". We stopped at log '%s' position "
+ "%llu%s", RPL_LOG_NAME, rli->group_master_log_pos,
tmp.c_ptr_safe());
}
}
@@ -4420,7 +4467,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
pthread_handler_t handle_slave_sql(void *arg)
{
THD *thd; /* needs to be first for thread_stack */
- char llbuff[22],llbuff1[22];
char saved_log_name[FN_REFLEN];
char saved_master_log_name[FN_REFLEN];
my_off_t UNINIT_VAR(saved_log_pos);
@@ -4429,6 +4475,7 @@ pthread_handler_t handle_slave_sql(void *arg)
my_off_t saved_skip= 0;
Master_info *mi= ((Master_info*)arg);
Relay_log_info* rli = &mi->rli;
+ my_bool wsrep_node_dropped __attribute__((unused)) = FALSE;
const char *errmsg;
rpl_group_info *serial_rgi;
rpl_sql_thread_info sql_info(mi->rpl_filter);
@@ -4437,8 +4484,9 @@ pthread_handler_t handle_slave_sql(void *arg)
my_thread_init();
DBUG_ENTER("handle_slave_sql");
- LINT_INIT(saved_master_log_pos);
- LINT_INIT(saved_log_pos);
+#ifdef WITH_WSREP
+ wsrep_restart_point:
+#endif
serial_rgi= new rpl_group_info(rli);
thd = new THD; // note that contructor of THD uses DBUG_ !
@@ -4555,10 +4603,9 @@ pthread_handler_t handle_slave_sql(void *arg)
THD_CHECK_SENTRY(thd);
#ifndef DBUG_OFF
{
- char llbuf1[22], llbuf2[22];
- DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
- llstr(my_b_tell(rli->cur_log),llbuf1),
- llstr(rli->event_relay_log_pos,llbuf2)));
+ DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%llu "
+ "rli->event_relay_log_pos=%llu",
+ my_b_tell(rli->cur_log), rli->event_relay_log_pos));
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
/*
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
@@ -4578,12 +4625,18 @@ pthread_handler_t handle_slave_sql(void *arg)
}
#endif
- DBUG_PRINT("master_info",("log_file_name: %s position: %s",
+#ifdef WITH_WSREP
+ thd->wsrep_exec_mode= LOCAL_STATE;
+ /* synchronize with wsrep replication */
+ if (WSREP_ON)
+ wsrep_ready_wait();
+#endif
+ DBUG_PRINT("master_info",("log_file_name: %s position: %llu",
rli->group_master_log_name,
- llstr(rli->group_master_log_pos,llbuff)));
+ rli->group_master_log_pos));
if (global_system_variables.log_warnings)
{
- String tmp;
+ StringBuffer<100> tmp;
if (mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
@@ -4591,10 +4644,11 @@ pthread_handler_t handle_slave_sql(void *arg)
mi->using_gtid==Master_info::USE_GTID_CURRENT_POS);
tmp.append(STRING_WITH_LEN("'"));
}
- sql_print_information("Slave SQL thread initialized, starting replication in \
-log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
- llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
- llstr(rli->group_relay_log_pos,llbuff1), tmp.c_ptr_safe());
+ sql_print_information("Slave SQL thread initialized, starting replication "
+ "in log '%s' at position %llu, relay log '%s' "
+ "position: %llu%s", RPL_LOG_NAME,
+ rli->group_master_log_pos, rli->group_relay_log_name,
+ rli->group_relay_log_pos, tmp.c_ptr_safe());
}
if (check_temp_dir(rli->slave_patternload_file))
@@ -4654,11 +4708,10 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
- rli->is_until_satisfied(thd, NULL))
+ rli->is_until_satisfied(rli->group_master_log_pos))
{
- char buf[22];
sql_print_information("Slave SQL thread stopped because it reached its"
- " UNTIL position %s", llstr(rli->until_pos(), buf));
+ " UNTIL position %llu", rli->until_pos());
mysql_mutex_unlock(&rli->data_lock);
goto err;
}
@@ -4673,7 +4726,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
if (saved_skip && rli->slave_skip_counter == 0)
{
- String tmp;
+ StringBuffer<100> tmp;
if (mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN(", GTID '"));
@@ -4695,23 +4748,29 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
saved_skip= 0;
saved_skip_gtid_pos.free();
}
-
+
if (exec_relay_log_event(thd, rli, serial_rgi))
{
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(serial_rgi))
+ {
slave_output_error_info(serial_rgi, thd);
+ if (WSREP_ON && rli->last_error().number == ER_UNKNOWN_COM_ERROR)
+ {
+ wsrep_node_dropped= TRUE;
+ }
+ }
goto err;
}
}
- if (opt_slave_parallel_threads > 0)
+ if (mi->using_parallel())
rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */
{
- String tmp;
+ StringBuffer<100> tmp;
if (mi->using_gtid != Master_info::USE_GTID_NO)
{
tmp.append(STRING_WITH_LEN("; GTID position '"));
@@ -4719,10 +4778,8 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
tmp.append(STRING_WITH_LEN("'"));
}
sql_print_information("Slave SQL thread exiting, replication stopped in "
- "log '%s' at position %s%s",
- RPL_LOG_NAME,
- llstr(rli->group_master_log_pos,llbuff),
- tmp.c_ptr_safe());
+ "log '%s' at position %llu%s", RPL_LOG_NAME,
+ rli->group_master_log_pos, tmp.c_ptr_safe());
}
err:
@@ -4732,7 +4789,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
(We want the first one to be before the printout of stop position to
get the correct position printed.)
*/
- if (opt_slave_parallel_threads > 0)
+ if (mi->using_parallel())
rli->parallel.wait_for_done(thd, rli);
/*
@@ -4756,7 +4813,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
ulong domain_count;
flush_relay_log_info(rli);
- if (opt_slave_parallel_threads > 0)
+ if (mi->using_parallel())
{
/*
In parallel replication GTID mode, we may stop with different domains
@@ -4827,6 +4884,27 @@ err_during_init:
thd->rgi_fake= thd->rgi_slave= NULL;
delete serial_rgi;
mysql_mutex_unlock(&LOCK_thread_count);
+#ifdef WITH_WSREP
+ /* if slave stopped due to node going non primary, we set global flag to
+ trigger automatic restart of slave when node joins back to cluster
+ */
+ if (WSREP_ON && wsrep_node_dropped && wsrep_restart_slave)
+ {
+ if (wsrep_ready)
+ {
+ WSREP_INFO("Slave error due to node temporarily non-primary"
+ "SQL slave will continue");
+ wsrep_node_dropped= FALSE;
+ mysql_mutex_unlock(&rli->run_lock);
+ goto wsrep_restart_point;
+ } else {
+ WSREP_INFO("Slave error due to node going non-primary");
+ WSREP_INFO("wsrep_restart_slave was set and therefore slave will be "
+ "automatically restarted when node joins back to cluster");
+ wsrep_restart_slave_activated= TRUE;
+ }
+ }
+#endif /* WITH_WSREP */
/*
Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
is important. Otherwise a killer_thread can execute between the calls and
@@ -4848,9 +4926,10 @@ err_during_init:
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_thread_count);
- THD_CHECK_SENTRY(thd);
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
+ thread_safe_decrement32(&service_thread_count);
+ signal_thd_deleted();
DBUG_LEAVE; // Must match DBUG_ENTER()
my_thread_end();
@@ -4928,7 +5007,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
if (unlikely(mi->rli.relay_log.append(&xev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
- ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
+ ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"error writing Exec_load event to relay log");
goto err;
}
@@ -4942,7 +5021,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
if (unlikely(mi->rli.relay_log.append(cev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
- ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
+ ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"error writing Create_file event to relay log");
goto err;
}
@@ -4957,7 +5036,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
if (unlikely(mi->rli.relay_log.append(&aev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
- ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
+ ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"error writing Append_block event to relay log");
goto err;
}
@@ -5261,7 +5340,7 @@ static int queue_old_event(Master_info *mi, const char *buf,
static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
int error= 0;
- String error_msg;
+ StringBuffer<1024> error_msg;
ulonglong inc_pos;
ulonglong event_pos;
Relay_log_info *rli= &mi->rli;
@@ -5278,9 +5357,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Show-up of FD:s affects checksum_alg at once because
that changes FD_queue.
*/
- uint8 checksum_alg= mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ?
- mi->checksum_alg_before_fd :
- mi->rli.relay_log.relay_log_checksum_alg;
+ enum enum_binlog_checksum_alg checksum_alg=
+ mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ?
+ mi->checksum_alg_before_fd : mi->rli.relay_log.relay_log_checksum_alg;
char *save_buf= NULL; // needed for checksumming the fake Rotate event
char rot_buf[LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + FN_REFLEN];
@@ -5422,7 +5501,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->master_log_name, rev.new_log_ident);
mysql_mutex_lock(log_lock);
- if (likely(!fdle.write(rli->relay_log.get_log_file()) &&
+ if (likely(!rli->relay_log.write_event(&fdle) &&
!rli->relay_log.flush_and_sync(NULL)))
{
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
@@ -5528,6 +5607,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
+ tmp->copy_crypto_data(mi->rli.relay_log.description_event_for_queue);
delete mi->rli.relay_log.description_event_for_queue;
mi->rli.relay_log.description_event_for_queue= tmp;
if (tmp->checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
@@ -5569,7 +5649,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
/*
HB (heartbeat) cannot come before RL (Relay)
*/
- char llbuf[22];
Heartbeat_log_event hb(buf,
mi->rli.relay_log.relay_log_checksum_alg
!= BINLOG_CHECKSUM_ALG_OFF ?
@@ -5582,8 +5661,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
error_msg.append(STRING_WITH_LEN(" log_pos "));
- llstr(hb.log_pos, llbuf);
- error_msg.append(llbuf, strlen(llbuf));
+ error_msg.append_ulonglong(hb.log_pos);
goto err;
}
mi->received_heartbeats++;
@@ -5611,8 +5689,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error_msg.append(STRING_WITH_LEN("the event's data: log_file_name "));
error_msg.append(hb.get_log_ident(), (uint) strlen(hb.get_log_ident()));
error_msg.append(STRING_WITH_LEN(" log_pos "));
- llstr(hb.log_pos, llbuf);
- error_msg.append(llbuf, strlen(llbuf));
+ error_msg.append_ulonglong(hb.log_pos);
goto err;
}
@@ -5670,6 +5747,12 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
case GTID_EVENT:
{
+ DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
+ {
+ mi->dbug_do_disconnect= true;
+ mi->dbug_event_counter= 2;
+ };);
+
uchar gtid_flag;
if (Gtid_log_event::peek(buf, event_len, checksum_alg,
@@ -5739,6 +5822,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->last_queued_gtid= event_gtid;
mi->last_queued_gtid_standalone=
(gtid_flag & Gtid_log_event::FL_STANDALONE) != 0;
+
+ /* Should filter all the subsequent events in the current GTID group? */
+ mi->domain_id_filter.do_filter(event_gtid.domain_id);
+
++mi->events_queued_since_last_gtid;
inc_pos= event_len;
}
@@ -5758,6 +5845,47 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
default:
default_action:
+ DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
+ {
+ if (mi->dbug_do_disconnect &&
+ (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT))
+ && (--mi->dbug_event_counter == 0))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ mi->dbug_do_disconnect= false; /* Safety */
+ goto err;
+ }
+ };);
+
+ DBUG_EXECUTE_IF("kill_slave_io_before_commit",
+ {
+ if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ Query_log_event::peek_is_commit_rollback(buf, event_len,
+ checksum_alg)))
+ {
+ error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+ goto err;
+ }
+ };);
+
+ if (mi->using_gtid != Master_info::USE_GTID_NO &&
+ mi->domain_id_filter.is_group_filtered() &&
+ mi->events_queued_since_last_gtid > 0 &&
+ ((mi->last_queued_gtid_standalone &&
+ !Log_event::is_part_of_group((Log_event_type)(uchar)
+ buf[EVENT_TYPE_OFFSET])) ||
+ (!mi->last_queued_gtid_standalone &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ Query_log_event::peek_is_commit_rollback(buf, event_len,
+ checksum_alg))))))
+ {
+ /* Reset the domain_id_filter flag. */
+ mi->domain_id_filter.reset_filter();
+ }
+
if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen)
{
if (unlikely(mi->gtid_reconnect_event_skip_count))
@@ -5861,7 +5989,15 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
(s_id != mi->master_id ||
/* for the master meta information is necessary */
(buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))))
+ buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
+
+ /*
+ Check whether it needs to be filtered based on domain_id
+ (DO_DOMAIN_IDS/IGNORE_DOMAIN_IDS).
+ */
+ (mi->domain_id_filter.is_group_filtered() &&
+ Log_event::is_group_event((Log_event_type)(uchar)
+ buf[EVENT_TYPE_OFFSET])))
{
/*
Do not write it to the relay log.
@@ -5899,7 +6035,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
else
{
- if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
+ if (likely(!rli->relay_log.write_event_buffer((uchar*)buf, event_len)))
{
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
@@ -5938,16 +6074,20 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
skip_relay_logging:
-
+
err:
if (unlock_data_lock)
mysql_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
- if (error)
- mi->report(ERROR_LEVEL, error, NULL, ER(error),
- (error == ER_SLAVE_RELAY_LOG_WRITE_FAILURE)?
- "could not queue event from master" :
+
+ /*
+ Do not print ER_SLAVE_RELAY_LOG_WRITE_FAILURE error here, as the caller
+ handle_slave_io() prints it on return.
+ */
+ if (error && error != ER_SLAVE_RELAY_LOG_WRITE_FAILURE)
+ mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr());
+
DBUG_RETURN(error);
}
@@ -6044,7 +6184,6 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
int slave_was_killed;
int last_errno= -2; // impossible error
ulong err_count=0;
- char llbuff[22];
my_bool my_true= 1;
DBUG_ENTER("connect_to_master");
set_slave_max_allowed_packet(thd, mysql);
@@ -6106,7 +6245,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
if (mi->user == NULL || mi->user[0] == 0)
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
- ER(ER_SLAVE_FATAL_ERROR),
+ ER_THD(thd, ER_SLAVE_FATAL_ERROR),
"Invalid (empty) username when attempting to "
"connect to the master server. Connection attempt "
"terminated.");
@@ -6152,11 +6291,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
if (reconnect)
{
if (!suppress_warnings && global_system_variables.log_warnings)
- sql_print_information("Slave: connected to master '%s@%s:%d',\
-replication resumed in log '%s' at position %s", mi->user,
- mi->host, mi->port,
- IO_RPL_LOG_NAME,
- llstr(mi->master_log_pos,llbuff));
+ sql_print_information("Slave: connected to master '%s@%s:%d',"
+ "replication resumed in log '%s' at "
+ "position %llu", mi->user, mi->host, mi->port,
+ IO_RPL_LOG_NAME, mi->master_log_pos);
}
else
{
@@ -6439,12 +6577,10 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
#ifndef DBUG_OFF
{
/* This is an assertion which sometimes fails, let's try to track it */
- char llbuf1[22], llbuf2[22];
- DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
- llstr(my_b_tell(cur_log),llbuf1),
- llstr(rli->event_relay_log_pos,llbuf2)));
+ DBUG_PRINT("info", ("my_b_tell(cur_log)=%llu rli->event_relay_log_pos=%llu",
+ my_b_tell(cur_log), rli->event_relay_log_pos));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(opt_slave_parallel_threads > 0 ||
+ DBUG_ASSERT(rli->mi->using_parallel() ||
my_b_tell(cur_log) == rli->event_relay_log_pos);
}
#endif
@@ -6649,6 +6785,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
mysql_file_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
rli->last_inuse_relaylog->completed= true;
+ rli->relay_log.description_event_for_exec->reset_crypto();
if (relay_log_purge)
{