summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc541
1 files changed, 325 insertions, 216 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index e6215356ad1..be95cc16d90 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -14,8 +14,10 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-
#include "mysql_priv.h"
+
+#ifdef HAVE_REPLICATION
+
#include <mysql.h>
#include <myisam.h>
#include "mini_client.h"
@@ -24,7 +26,6 @@
#include "repl_failsafe.h"
#include <thr_alarm.h>
#include <my_dir.h>
-#include <assert.h>
bool use_slave_mask = 0;
MY_BITMAP slave_error_mask;
@@ -55,7 +56,6 @@ static int events_till_disconnect = -1;
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
-void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli);
@@ -78,8 +78,21 @@ char* rewrite_db(char* db);
/*
- Get a bit mask for which threads are running so that we later can
- restart these threads
+ Find out which replications threads are running
+
+ SYNOPSIS
+ init_thread_mask()
+ mask Return value here
+ mi master_info for slave
+ inverse If set, returns which threads are not running
+
+ IMPLEMENTATION
+ Get a bit mask for which threads are running so that we can later restart
+ these threads.
+
+ RETURN
+ mask If inverse == 0, running threads
+ If inverse == 1, stopped threads
*/
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
@@ -96,6 +109,10 @@ void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
}
+/*
+ lock_slave_threads()
+*/
+
void lock_slave_threads(MASTER_INFO* mi)
{
//TODO: see if we can do this without dual mutex
@@ -103,6 +120,11 @@ void lock_slave_threads(MASTER_INFO* mi)
pthread_mutex_lock(&mi->rli.run_lock);
}
+
+/*
+ unlock_slave_threads()
+*/
+
void unlock_slave_threads(MASTER_INFO* mi)
{
//TODO: see if we can do this without dual mutex
@@ -111,6 +133,8 @@ void unlock_slave_threads(MASTER_INFO* mi)
}
+/* Initialize slave structures */
+
int init_slave()
{
DBUG_ENTER("init_slave");
@@ -166,6 +190,7 @@ static void free_table_ent(TABLE_RULE_ENT* e)
my_free((gptr) e, MYF(0));
}
+
static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
my_bool not_used __attribute__((unused)))
{
@@ -192,11 +217,7 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
- If not, open the 'log' binary file.
TODO
- - check proper initialization of master_log_name/master_log_pos
- - We may always want to delete all logs before 'log'.
- Currently if we are not calling this with 'log' as NULL or the first
- log we will never delete relay logs.
- If we want this we should not set skip_log_purge to 1.
+ - check proper initialization of group_master_log_name/group_master_log_pos
RETURN VALUES
0 ok
@@ -223,7 +244,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
rli->cur_log_fd = -1;
}
- rli->relay_log_pos = pos;
+ rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
/*
Test to see if the previous run was with the skip of purging
@@ -235,18 +256,15 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
goto err;
}
- if (log) // If not first log
+ if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
{
- if (strcmp(log, rli->linfo.log_file_name))
- rli->skip_log_purge= 1; // Different name; Don't purge
- if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
- {
- *errmsg="Could not find target log during relay log initialization";
- goto err;
- }
+ *errmsg="Could not find target log during relay log initialization";
+ goto err;
}
- strmake(rli->relay_log_name,rli->linfo.log_file_name,
- sizeof(rli->relay_log_name)-1);
+ strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->group_relay_log_name)-1);
+ strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->event_relay_log_name)-1);
if (rli->relay_log.is_active(rli->linfo.log_file_name))
{
/*
@@ -277,7 +295,7 @@ err:
If we don't purge, we can't honour relay_log_space_limit ;
silently discard it
*/
- if (rli->skip_log_purge)
+ if (!relay_log_purge)
rli->log_space_limit= 0;
pthread_cond_broadcast(&rli->data_cond);
if (need_data_lock)
@@ -287,7 +305,16 @@ err:
}
-/* called from get_options() in mysqld.cc on start-up */
+/*
+ Init functio to set up array for errors that should be skipped for slave
+
+ SYNOPSIS
+ init_slave_skip_errors()
+ arg List of errors numbers to skip, separated with ','
+
+ NOTES
+ Called from get_options() in mysqld.cc on start-up
+*/
void init_slave_skip_errors(const char* arg)
{
@@ -298,9 +325,9 @@ void init_slave_skip_errors(const char* arg)
exit(1);
}
use_slave_mask = 1;
- for (;isspace(*arg);++arg)
+ for (;my_isspace(system_charset_info,*arg);++arg)
/* empty */;
- if (!my_casecmp(arg,"all",3))
+ if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4))
{
bitmap_set_all(&slave_error_mask);
return;
@@ -312,15 +339,17 @@ void init_slave_skip_errors(const char* arg)
break;
if (err_code < MAX_SLAVE_ERROR)
bitmap_set_bit(&slave_error_mask,(uint)err_code);
- while (!isdigit(*p) && *p)
+ while (!my_isdigit(system_charset_info,*p) && *p)
p++;
}
}
/*
- We assume we have a run lock on rli and that both slave thread
- are not running
+ purge_relay_logs()
+
+ NOTES
+ Assumes to have a run lock on rli and that no slave thread are running.
*/
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
@@ -347,9 +376,8 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
to display fine in any case.
*/
- rli->master_log_name[0]= 0;
- rli->master_log_pos= 0;
- rli->pending= 0;
+ rli->group_master_log_name[0]= 0;
+ rli->group_master_log_pos= 0;
if (!rli->inited)
DBUG_RETURN(0);
@@ -366,16 +394,18 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
goto err;
}
/* Save name of used relay log file */
- strmake(rli->relay_log_name, rli->relay_log.get_log_fname(),
- sizeof(rli->relay_log_name)-1);
+ strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(rli->group_relay_log_name)-1);
+ strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
+ sizeof(rli->event_relay_log_name)-1);
// Just first log with magic number and nothing else
rli->log_space_total= BIN_LOG_HEADER_SIZE;
- rli->relay_log_pos= BIN_LOG_HEADER_SIZE;
+ rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
rli->relay_log.reset_bytes_written();
if (!just_reset)
- error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos,
- 0 /* do not need data lock */, errmsg);
-
+ error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos,
+ 0 /* do not need data lock */, errmsg);
+
err:
#ifndef DBUG_OFF
char buf[22];
@@ -472,7 +502,8 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
pthread_cond_t *start_cond,
volatile bool *slave_running,
volatile ulong *slave_run_id,
- MASTER_INFO* mi)
+ MASTER_INFO* mi,
+ bool high_priority)
{
pthread_t th;
ulong start_id;
@@ -501,6 +532,8 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
}
start_id= *slave_run_id;
DBUG_PRINT("info",("Creating new slave thread"));
+ if (high_priority)
+ my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR);
if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
{
if (start_lock)
@@ -531,9 +564,12 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
/*
- SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
- sense to do that for starting a slave - we always care if it actually
- started the threads that were not previously running
+ start_slave_threads()
+
+ NOTES
+ SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
+ sense to do that for starting a slave--we always care if it actually
+ started the threads that were not previously running
*/
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
@@ -562,13 +598,13 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
cond_io,
&mi->slave_running, &mi->slave_run_id,
- mi);
+ mi, 1); //high priority, to read the most possible
if (!error && (thread_mask & SLAVE_SQL))
{
error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
cond_sql,
&mi->rli.slave_running, &mi->rli.slave_run_id,
- mi);
+ mi, 0);
if (error)
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
}
@@ -578,12 +614,13 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
void init_table_rule_hash(HASH* h, bool* h_inited)
{
- hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
+ hash_init(h, system_charset_info,TABLE_RULE_HASH_SIZE,0,0,
(hash_get_key) get_table_key,
(hash_free_key) free_table_ent, 0);
*h_inited = 1;
}
+
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
{
my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
@@ -591,6 +628,7 @@ void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
*a_inited = 1;
}
+
static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
uint i;
@@ -600,8 +638,10 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
TABLE_RULE_ENT* e ;
get_dynamic(a, (gptr)&e, i);
- if (!wild_case_compare(key, key_end, (const char*)e->db,
- (const char*)(e->db + e->key_len),'\\'))
+ if (!my_wildcmp(system_charset_info, key, key_end,
+ (const char*)e->db,
+ (const char*)(e->db + e->key_len),
+ '\\',wild_one,wild_many))
return e;
}
@@ -740,6 +780,11 @@ int add_table_rule(HASH* h, const char* table_spec)
return 0;
}
+
+/*
+ Add table expression with wildcards to dynamic array
+*/
+
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
const char* dot = strchr(table_spec, '.');
@@ -756,6 +801,7 @@ int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
return 0;
}
+
static void free_string_array(DYNAMIC_ARRAY *a)
{
uint i;
@@ -768,8 +814,8 @@ static void free_string_array(DYNAMIC_ARRAY *a)
delete_dynamic(a);
}
-#ifdef NOT_USED_YET
+#ifdef NOT_USED_YET
static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
end_master_info(mi);
@@ -778,6 +824,13 @@ static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
#endif
+/*
+ Free all resources used by slave
+
+ SYNOPSIS
+ end_slave()
+*/
+
void end_slave()
{
if (active_mi)
@@ -830,13 +883,25 @@ void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
rli->last_slave_errno = err_code;
}
+/*
+ skip_load_data_infile()
-void skip_load_data_infile(NET* net)
+ NOTES
+ This is used to tell a 3.23 master to break send_file()
+*/
+
+void skip_load_data_infile(NET *net)
+{
+ (void)net_request_file(net, "/dev/null");
+ (void)my_net_read(net); // discard response
+ (void)net_write_command(net, 0, "", 0, "", 0); // Send ok
+}
+
+
+bool net_request_file(NET* net, const char* fname)
{
- (void)my_net_write(net, "\xfb/dev/null", 10);
- (void)net_flush(net);
- (void)my_net_read(net); // discard response
- send_ok(net); // the master expects it
+ DBUG_ENTER("net_request_file");
+ DBUG_RETURN(net_write_command(net, 251, fname, strlen(fname), "", 0));
}
@@ -998,13 +1063,13 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
if (packet_len == packet_error)
{
- send_error(&thd->net, ER_MASTER_NET_READ);
+ send_error(thd, ER_MASTER_NET_READ);
return 1;
}
if (net->read_pos[0] == 255) // error from master
{
net->read_pos[packet_len] = 0;
- net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
+ net_printf(thd, ER_MASTER, net->read_pos + 3);
return 1;
}
thd->command = COM_TABLE_DUMP;
@@ -1012,7 +1077,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
if (!(query = sql_alloc(packet_len + 1)))
{
sql_print_error("create_table_from_dump: out of memory");
- net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
+ net_printf(thd, ER_GET_ERRNO, "Out of memory");
return 1;
}
memcpy(query, net->read_pos, packet_len);
@@ -1050,7 +1115,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
thd->proc_info = "Opening master dump table";
if (!open_ltable(thd, &tables, TL_WRITE))
{
- send_error(&thd->net,0,0); // Send error from open_ltable
+ send_error(thd,0,0); // Send error from open_ltable
sql_print_error("create_table_from_dump: could not open created table");
goto err;
}
@@ -1059,7 +1124,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
thd->proc_info = "Reading master dump table data";
if (file->net_read_dump(net))
{
- net_printf(&thd->net, ER_MASTER_NET_READ);
+ net_printf(thd, ER_MASTER_NET_READ);
sql_print_error("create_table_from_dump::failed in\
handler::net_read_dump()");
goto err;
@@ -1078,7 +1143,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
error=file->repair(thd,&check_opt) != 0;
thd->net.vio = save_vio;
if (error)
- net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name);
+ net_printf(thd, ER_INDEX_REBUILD,tables.table->real_name);
err:
close_thread_tables(thd);
@@ -1086,6 +1151,7 @@ err:
return error;
}
+
int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
MASTER_INFO *mi, MYSQL *mysql)
{
@@ -1100,12 +1166,12 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
{
if (!(mysql = mc_mysql_init(NULL)))
{
- send_error(&thd->net); // EOM
+ send_error(thd); // EOM
DBUG_RETURN(1);
}
if (connect_to_master(thd, mysql, mi))
{
- net_printf(&thd->net, ER_CONNECT_TO_MASTER, mc_mysql_error(mysql));
+ net_printf(thd, ER_CONNECT_TO_MASTER, mc_mysql_error(mysql));
mc_mysql_close(mysql);
DBUG_RETURN(1);
}
@@ -1129,7 +1195,7 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
if (!called_connected)
mc_mysql_close(mysql);
if (errmsg && thd->net.vio)
- send_error(&thd->net, error, errmsg);
+ send_error(thd, error, errmsg);
DBUG_RETURN(test(error)); // Return 1 on error
}
@@ -1166,13 +1232,11 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
pthread_mutex_lock(&rli->data_lock);
info_fd = rli->info_fd;
- rli->pending = 0;
rli->cur_log_fd = -1;
rli->slave_skip_counter=0;
rli->abort_pos_wait=0;
- rli->skip_log_purge=0;
- rli->log_space_limit = relay_log_space_limit;
- rli->log_space_total = 0;
+ rli->log_space_limit= relay_log_space_limit;
+ rli->log_space_total= 0;
// TODO: make this work with multi-master
if (!opt_relay_logname)
@@ -1213,8 +1277,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
&msg))
goto err;
- rli->master_log_name[0]= 0;
- rli->master_log_pos= 0;
+ rli->group_master_log_name[0]= 0;
+ rli->group_master_log_pos= 0;
rli->info_fd= info_fd;
}
else // file exists
@@ -1235,31 +1299,33 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->info_fd = info_fd;
int relay_log_pos, master_log_pos;
- if (init_strvar_from_file(rli->relay_log_name,
- sizeof(rli->relay_log_name), &rli->info_file,
+ if (init_strvar_from_file(rli->group_relay_log_name,
+ sizeof(rli->group_relay_log_name), &rli->info_file,
"") ||
init_intvar_from_file(&relay_log_pos,
&rli->info_file, BIN_LOG_HEADER_SIZE) ||
- init_strvar_from_file(rli->master_log_name,
- sizeof(rli->master_log_name), &rli->info_file,
+ init_strvar_from_file(rli->group_master_log_name,
+ sizeof(rli->group_master_log_name), &rli->info_file,
"") ||
init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
{
msg="Error reading slave log configuration";
goto err;
}
- rli->relay_log_pos= relay_log_pos;
- rli->master_log_pos= master_log_pos;
+ strmake(rli->event_relay_log_name,rli->group_relay_log_name,
+ sizeof(rli->event_relay_log_name)-1);
+ rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
+ rli->group_master_log_pos= master_log_pos;
if (init_relay_log_pos(rli,
- rli->relay_log_name,
- rli->relay_log_pos,
+ rli->group_relay_log_name,
+ rli->group_relay_log_pos,
0 /* no data lock*/,
&msg))
goto err;
}
- DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+ DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
/*
Now change the cache from READ to WRITE - must do this
before flush_relay_log_info
@@ -1314,9 +1380,11 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
THD* thd = mi->io_thd;
DBUG_ENTER("wait_for_relay_log_space");
+
pthread_mutex_lock(&rli->log_space_lock);
save_proc_info = thd->proc_info;
thd->proc_info = "Waiting for relay log space to free";
+
while (rli->log_space_limit < rli->log_space_total &&
!(slave_killed=io_slave_killed(thd,mi)) &&
!rli->ignore_log_space_limit)
@@ -1333,7 +1401,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space");
- rli->log_space_total = 0;
+ rli->log_space_total= 0;
if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
{
sql_print_error("Could not find first log while counting relay log space");
@@ -1468,109 +1536,115 @@ err:
int register_slave_on_master(MYSQL* mysql)
{
- String packet;
- char buf[4];
+ char buf[1024], *pos= buf;
+ uint report_host_len, report_user_len=0, report_password_len=0;
if (!report_host)
return 0;
-
- int4store(buf, server_id);
- packet.append(buf, 4);
-
- net_store_data(&packet, report_host);
+ report_host_len= strlen(report_host);
if (report_user)
- net_store_data(&packet, report_user);
- else
- packet.append((char)0);
-
+ report_user_len= strlen(report_user);
if (report_password)
- net_store_data(&packet, report_user);
- else
- packet.append((char)0);
-
- int2store(buf, (uint16)report_port);
- packet.append(buf, 2);
- int4store(buf, rpl_recovery_rank);
- packet.append(buf, 4);
- int4store(buf, 0); /* tell the master will fill in master_id */
- packet.append(buf, 4);
-
- if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
- packet.length(), 0))
+ report_password_len= strlen(report_password);
+ /* 30 is a good safety margin */
+ if (report_host_len + report_user_len + report_password_len + 30 >
+ sizeof(buf))
+ return 0; // safety
+
+ int4store(pos, server_id); pos+= 4;
+ pos= net_store_data(pos, report_host, report_host_len);
+ pos= net_store_data(pos, report_user, report_user_len);
+ pos= net_store_data(pos, report_password, report_password_len);
+ int2store(pos, (uint16) report_port); pos+= 2;
+ int4store(pos, rpl_recovery_rank); pos+= 4;
+ /* The master will fill in master_id */
+ int4store(pos, 0); pos+= 4;
+
+ if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*) buf,
+ (uint) (pos- buf), 0))
{
sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'",
mc_mysql_errno(mysql),
mc_mysql_error(mysql));
return 1;
}
-
return 0;
}
+
int show_master_info(THD* thd, MASTER_INFO* mi)
{
// TODO: fix this for multi-master
- DBUG_ENTER("show_master_info");
List<Item> field_list;
+ Protocol *protocol= thd->protocol;
+ DBUG_ENTER("show_master_info");
+
field_list.push_back(new Item_empty_string("Master_Host",
sizeof(mi->host)));
field_list.push_back(new Item_empty_string("Master_User",
sizeof(mi->user)));
- field_list.push_back(new Item_empty_string("Master_Port", 6));
- field_list.push_back(new Item_empty_string("Connect_retry", 6));
+ field_list.push_back(new Item_return_int("Master_Port", 7,
+ MYSQL_TYPE_LONG));
+ field_list.push_back(new Item_return_int("Connect_retry", 10,
+ MYSQL_TYPE_LONG));
field_list.push_back(new Item_empty_string("Master_Log_File",
- FN_REFLEN));
- field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12));
+ FN_REFLEN));
+ field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10,
+ MYSQL_TYPE_LONGLONG));
field_list.push_back(new Item_empty_string("Relay_Log_File",
- FN_REFLEN));
- field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12));
+ FN_REFLEN));
+ field_list.push_back(new Item_return_int("Relay_Log_Pos", 10,
+ MYSQL_TYPE_LONGLONG));
field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
- FN_REFLEN));
+ FN_REFLEN));
field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
- field_list.push_back(new Item_empty_string("Last_errno", 4));
+ field_list.push_back(new Item_return_int("Last_errno", 4, MYSQL_TYPE_LONG));
field_list.push_back(new Item_empty_string("Last_error", 20));
- field_list.push_back(new Item_empty_string("Skip_counter", 12));
- field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
- field_list.push_back(new Item_empty_string("Relay_log_space", 12));
- if (send_fields(thd, field_list, 1))
+ field_list.push_back(new Item_return_int("Skip_counter", 10,
+ MYSQL_TYPE_LONG));
+ field_list.push_back(new Item_return_int("Exec_master_log_pos", 10,
+ MYSQL_TYPE_LONGLONG));
+ field_list.push_back(new Item_return_int("Relay_log_space", 10,
+ MYSQL_TYPE_LONGLONG));
+ if (protocol->send_fields(&field_list, 1))
DBUG_RETURN(-1);
if (mi->host[0])
{
String *packet= &thd->packet;
- packet->length(0);
+ protocol->prepare_for_resend();
pthread_mutex_lock(&mi->data_lock);
pthread_mutex_lock(&mi->rli.data_lock);
- net_store_data(packet, mi->host);
- net_store_data(packet, mi->user);
- net_store_data(packet, (uint32) mi->port);
- net_store_data(packet, (uint32) mi->connect_retry);
- net_store_data(packet, mi->master_log_name);
- net_store_data(packet, (longlong) mi->master_log_pos);
- net_store_data(packet, mi->rli.relay_log_name +
- dirname_length(mi->rli.relay_log_name));
- net_store_data(packet, (longlong) mi->rli.relay_log_pos);
- net_store_data(packet, mi->rli.master_log_name);
- net_store_data(packet, mi->slave_running ? "Yes":"No");
- net_store_data(packet, mi->rli.slave_running ? "Yes":"No");
- net_store_data(packet, &replicate_do_db);
- net_store_data(packet, &replicate_ignore_db);
- net_store_data(packet, (uint32)mi->rli.last_slave_errno);
- net_store_data(packet, mi->rli.last_slave_error);
- net_store_data(packet, mi->rli.slave_skip_counter);
- net_store_data(packet, (longlong) mi->rli.master_log_pos);
- net_store_data(packet, (longlong) mi->rli.log_space_total);
+ protocol->store(mi->host, &my_charset_bin);
+ protocol->store(mi->user, &my_charset_bin);
+ protocol->store((uint32) mi->port);
+ protocol->store((uint32) mi->connect_retry);
+ protocol->store(mi->master_log_name, &my_charset_bin);
+ protocol->store((ulonglong) mi->master_log_pos);
+ protocol->store(mi->rli.group_relay_log_name +
+ dirname_length(mi->rli.group_relay_log_name), &my_charset_bin);
+ protocol->store((ulonglong) mi->rli.group_relay_log_pos);
+ protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
+ protocol->store(mi->slave_running ? "Yes":"No", &my_charset_bin);
+ protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
+ protocol->store(&replicate_do_db);
+ protocol->store(&replicate_ignore_db);
+ protocol->store((uint32) mi->rli.last_slave_errno);
+ protocol->store(mi->rli.last_slave_error, &my_charset_bin);
+ protocol->store((uint32) mi->rli.slave_skip_counter);
+ protocol->store((ulonglong) mi->rli.group_master_log_pos);
+ protocol->store((ulonglong) mi->rli.log_space_total);
pthread_mutex_unlock(&mi->rli.data_lock);
pthread_mutex_unlock(&mi->data_lock);
if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
DBUG_RETURN(-1);
}
- send_eof(&thd->net);
+ send_eof(thd);
DBUG_RETURN(0);
}
@@ -1586,25 +1660,22 @@ bool flush_master_info(MASTER_INFO* mi)
my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
- mi->password, mi->port, mi->connect_retry
- );
+ mi->password, mi->port, mi->connect_retry);
flush_io_cache(file);
DBUG_RETURN(0);
}
st_relay_log_info::st_relay_log_info()
- :info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0),
- cur_log_old_open_count(0), log_space_total(0), ignore_log_space_limit(0),
- slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0),
- sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
- slave_running(0), skip_log_purge(0),
- inside_transaction(0) /* the default is autocommit=1 */
-{
- relay_log_name[0] = master_log_name[0] = 0;
+ :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
+ cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
+ ignore_log_space_limit(0), slave_skip_counter(0), abort_pos_wait(0),
+ slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
+ slave_running(0)
+{
+ group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0;
last_slave_error[0]=0;
-
bzero(&info_file,sizeof(info_file));
bzero(&cache_buf, sizeof(cache_buf));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
@@ -1666,8 +1737,8 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
set_timespec(abstime,timeout);
DBUG_ENTER("wait_for_pos");
- DBUG_PRINT("enter",("master_log_name: '%s' pos: %lu timeout: %ld",
- master_log_name, (ulong) master_log_pos,
+ DBUG_PRINT("enter",("group_master_log_name: '%s' pos: %lu timeout: %ld",
+ group_master_log_name, (ulong) group_master_log_pos,
(long) timeout));
pthread_mutex_lock(&data_lock);
@@ -1717,10 +1788,10 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
{
bool pos_reached;
int cmp_result= 0;
- DBUG_ASSERT(*master_log_name || master_log_pos == 0);
- if (*master_log_name)
+ DBUG_ASSERT(*group_master_log_name || group_master_log_pos == 0);
+ if (*group_master_log_name)
{
- char *basename= master_log_name + dirname_length(master_log_name);
+ char *basename= group_master_log_name + dirname_length(group_master_log_name);
/*
First compare the parts before the extension.
Find the dot in the master's log basename,
@@ -1735,13 +1806,13 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
}
// Now compare extensions.
char *q_end;
- ulong master_log_name_extension= strtoul(q, &q_end, 10);
- if (master_log_name_extension < log_name_extension)
+ ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
+ if (group_master_log_name_extension < log_name_extension)
cmp_result = -1 ;
else
- cmp_result= (master_log_name_extension > log_name_extension) ? 1 : 0 ;
+ cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
}
- pos_reached = ((!cmp_result && master_log_pos >= (ulonglong)log_pos) ||
+ pos_reached = ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
cmp_result > 0);
if (pos_reached || thd->killed)
break;
@@ -1796,6 +1867,10 @@ improper_arguments: %d timed_out: %d",
}
+/*
+ init_slave_thread()
+*/
+
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
{
DBUG_ENTER("init_slave_thread");
@@ -1934,7 +2009,7 @@ command");
/*
- read one event from the master
+ Read one event from the master
SYNOPSIS
read_event()
@@ -1948,7 +2023,6 @@ command");
RETURN VALUES
'packet_error' Error
number Length of packet
-
*/
static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
@@ -2024,7 +2098,6 @@ point. If you are sure that your master is ok, run this query manually on the\
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
- DBUG_ASSERT(rli->sql_thd==thd);
Log_event * ev = next_event(rli);
DBUG_ASSERT(rli->sql_thd==thd);
if (sql_slave_killed(thd,rli))
@@ -2046,7 +2119,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
(rli->slave_skip_counter && type_code != ROTATE_EVENT))
{
/* TODO: I/O thread should not even log events with the same server id */
- rli->inc_pos(ev->get_event_len(),
+ rli->inc_group_relay_log_pos(ev->get_event_len(),
type_code != STOP_EVENT ? ev->log_pos : LL(0),
1/* skip lock*/);
flush_relay_log_info(rli);
@@ -2087,7 +2160,8 @@ This may also be a network problem, or just a bug in the master or slave code.\
}
-/* slave I/O thread */
+/* Slave I/O Thread entry point */
+
extern "C" pthread_handler_decl(handle_slave_io,arg)
{
THD *thd; // needs to be first for thread_stack
@@ -2360,7 +2434,7 @@ err:
}
-/* slave SQL logic thread */
+/* Slave SQL Thread entry point */
extern "C" pthread_handler_decl(handle_slave_sql,arg)
{
@@ -2386,7 +2460,8 @@ slave_begin:
#endif
thd = new THD; // note that contructor of THD uses DBUG_ !
- THD_CHECK_SENTRY(thd);
+ thd->thread_stack = (char*)&thd; // remember where our stack is
+
/* Inform waiting threads that slave has started */
rli->slave_run_id++;
@@ -2402,9 +2477,9 @@ slave_begin:
sql_print_error("Failed during slave thread initialization");
goto err;
}
+ thd->init_for_queries();
rli->sql_thd= thd;
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
- thd->thread_stack = (char*)&thd; // remember where our stack is
pthread_mutex_lock(&LOCK_thread_count);
threads.append(thd);
pthread_mutex_unlock(&LOCK_thread_count);
@@ -2412,15 +2487,13 @@ slave_begin:
rli->abort_slave = 0;
pthread_mutex_unlock(&rli->run_lock);
pthread_cond_broadcast(&rli->start_cond);
- // This should always be set to 0 when the slave thread is started
- rli->pending = 0;
//tell the I/O thread to take relay_log_space_limit into account from now on
rli->ignore_log_space_limit= 0;
if (init_relay_log_pos(rli,
- rli->relay_log_name,
- rli->relay_log_pos,
+ rli->group_relay_log_name,
+ rli->group_relay_log_pos,
1 /*need data lock*/, &errmsg))
{
sql_print_error("Error initializing relay log position: %s",
@@ -2428,18 +2501,18 @@ slave_begin:
goto err;
}
THD_CHECK_SENTRY(thd);
- DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+ DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
- rli->master_log_name,
- llstr(rli->master_log_pos,llbuff)));
+ rli->group_master_log_name,
+ llstr(rli->group_master_log_pos,llbuff)));
if (global_system_variables.log_warnings)
sql_print_error("Slave SQL thread initialized, starting replication in \
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
- llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
- llstr(rli->relay_log_pos,llbuff1));
+ llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
+ llstr(rli->group_relay_log_pos,llbuff1));
/* Read queries from the IO/THREAD until this thread is killed */
@@ -2456,7 +2529,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
Error running query, slave SQL thread aborted. Fix the problem, and restart \
the slave SQL thread with \"SLAVE START\". We stopped at log \
'%s' position %s",
- RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
+ RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
goto err;
}
}
@@ -2464,7 +2537,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
/* Thread stopped. Print the current replication position to the log */
sql_print_error("Slave SQL thread exiting, replication stopped in log \
'%s' at position %s",
- RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
+ RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
err:
VOID(pthread_mutex_lock(&LOCK_thread_count));
@@ -2497,19 +2570,23 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
if (abort_slave_event_count && !rli->events_till_abort)
goto slave_begin;
#endif
- my_thread_end(); // clean-up before broadcasting termination
+ my_thread_end();
pthread_exit(0);
DBUG_RETURN(0); // Can't return anything here
}
+/*
+ process_io_create_file()
+*/
+
static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
int error = 1;
ulong num_bytes;
bool cev_not_written;
- THD* thd;
- NET* net = &mi->mysql->net;
+ THD *thd = mi->io_thd;
+ NET *net = &mi->mysql->net;
DBUG_ENTER("process_io_create_file");
if (unlikely(!cev->is_valid()))
@@ -2523,7 +2600,6 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
DBUG_RETURN(0);
}
DBUG_ASSERT(cev->inited_from_old);
- thd = mi->io_thd;
thd->file_id = cev->file_id = mi->file_id++;
thd->server_id = cev->server_id;
cev_not_written = 1;
@@ -2553,7 +2629,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
}
if (unlikely(!num_bytes)) /* eof */
{
- send_ok(net); /* 3.23 master wants it */
+ net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */
Execute_load_log_event xev(thd,0);
xev.log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(&xev)))
@@ -2599,6 +2675,7 @@ err:
DBUG_RETURN(error);
}
+
/*
Start using a new binary log on the master
@@ -2608,7 +2685,7 @@ err:
rev The rotate log event read from the binary log
DESCRIPTION
- Updates the master info and relay data with the place in the next binary
+ Updates the master info with the place in the next binary
log where we should start reading.
NOTES
@@ -2617,6 +2694,7 @@ err:
RETURN VALUES
0 ok
1 Log event is illegal
+
*/
static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
@@ -2643,7 +2721,10 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
DBUG_RETURN(0);
}
+
/*
+ queue_old_event()
+
TODO:
Test this code before release - it has to be tested on a separate
setup with 3.23 master
@@ -2736,7 +2817,10 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_RETURN(0);
}
+
/*
+ queue_event()
+
TODO: verify the issue with stop events, see if we need them at all
in the relay log
*/
@@ -2817,7 +2901,20 @@ void end_relay_log_info(RELAY_LOG_INFO* rli)
DBUG_VOID_RETURN;
}
-/* try to connect until successful or slave killed */
+/*
+ Try to connect until successful or slave killed
+
+ SYNPOSIS
+ safe_connect()
+ thd Thread handler for slave
+ mysql MySQL connection handle
+ mi Replication handle
+
+ RETURN
+ 0 ok
+ # Error
+*/
+
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
return connect_to_master(thd, mysql, mi, 0, 0);
@@ -2825,8 +2922,12 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
/*
- Try to connect until successful or slave killed or we have retried
- master_retry_count times
+ SYNPOSIS
+ connect_to_master()
+
+ IMPLEMENTATION
+ Try to connect until successful or slave killed or we have retried
+ master_retry_count times
*/
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
@@ -2909,8 +3010,11 @@ replication resumed in log '%s' at position %s", mi->user,
/*
- Try to connect until successful or slave killed or we have retried
- master_retry_count times
+ safe_reconnect()
+
+ IMPLEMENTATION
+ Try to connect until successful or slave killed or we have retried
+ master_retry_count times
*/
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
@@ -2955,18 +3059,14 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli)
IO_CACHE *file = &rli->info_file;
char buff[FN_REFLEN*2+22*2+4], *pos;
- /* sql_thd is not set when calling from init_slave() */
- if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN))
- return 0; // Wait for COMMIT
-
my_b_seek(file, 0L);
- pos=strmov(buff, rli->relay_log_name);
+ pos=strmov(buff, rli->group_relay_log_name);
*pos++='\n';
- pos=longlong2str(rli->relay_log_pos, pos, 10);
+ pos=longlong2str(rli->group_relay_log_pos, pos, 10);
*pos++='\n';
- pos=strmov(pos, rli->master_log_name);
+ pos=strmov(pos, rli->group_master_log_name);
*pos++='\n';
- pos=longlong2str(rli->master_log_pos, pos, 10);
+ pos=longlong2str(rli->group_master_log_pos, pos, 10);
*pos='\n';
if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
error=1;
@@ -2979,8 +3079,7 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli)
/*
- This function is called when we notice that the current "hot" log
- got rotated under our feet.
+ Called when we notice that the current "hot" log got rotated under our feet.
*/
static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
@@ -2990,7 +3089,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
DBUG_ENTER("reopen_relay_log");
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
- if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
errmsg)) <0)
DBUG_RETURN(0);
/*
@@ -2998,7 +3097,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
relay_log_pos Current log pos
pending Number of bytes already processed from the event
*/
- my_b_seek(cur_log,rli->relay_log_pos + rli->pending);
+ my_b_seek(cur_log,rli->event_relay_log_pos);
DBUG_RETURN(cur_log);
}
@@ -3007,7 +3106,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
{
Log_event* ev;
IO_CACHE* cur_log = rli->cur_log;
- pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
+ pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
THD* thd = rli->sql_thd;
DBUG_ENTER("next_event");
@@ -3056,7 +3155,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
}
}
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
+ DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
/*
Relay log is always in new format - if the master is 3.23, the
I/O thread will convert the format for us
@@ -3123,8 +3222,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
// prevent the I/O thread from blocking next times
rli->ignore_log_space_limit= 1;
// If the I/O thread is blocked, unblock it
- pthread_cond_broadcast(&rli->log_space_cond);
pthread_mutex_unlock(&rli->log_space_lock);
+ pthread_cond_broadcast(&rli->log_space_cond);
// Note that wait_for_update unlocks lock_log !
rli->relay_log.wait_for_update(rli->sql_thd);
// re-acquire data lock since we released it earlier
@@ -3141,16 +3240,25 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
my_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1;
- /*
- TODO: make skip_log_purge a start-up option. At this point this
- is not critical priority
- */
- if (!rli->skip_log_purge)
+ if (relay_log_purge)
{
- // purge_first_log will properly set up relay log coordinates in rli
- if (rli->relay_log.purge_first_log(rli))
+ /*
+ purge_first_log will properly set up relay log coordinates in rli.
+ If the group's coordinates are equal to the event's coordinates
+ (i.e. the relay log was not rotated in the middle of a group),
+ we can purge this relay log too.
+ We do ulonglong and string comparisons, this may be slow but
+ - purging the last relay log is nice (it can save 1GB of disk), so we
+ like to detect the case where we can do it, and given this,
+ - I see no better detection method
+ - purge_first_log is not called that often
+ */
+ if (rli->relay_log.purge_first_log
+ (rli,
+ rli->group_relay_log_pos == rli->event_relay_log_pos
+ && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
{
- errmsg = "Error purging processed log";
+ errmsg = "Error purging processed logs";
goto err;
}
}
@@ -3168,10 +3276,9 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
errmsg = "error switching to the next log";
goto err;
}
- rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
- rli->pending=0;
- strmake(rli->relay_log_name,rli->linfo.log_file_name,
- sizeof(rli->relay_log_name)-1);
+ rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
+ strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->event_relay_log_name)-1);
flush_relay_log_info(rli);
}
@@ -3219,7 +3326,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
event(errno: %d cur_log->error: %d)",
my_errno,cur_log->error);
// set read position to the beginning of the event
- my_b_seek(cur_log,rli->relay_log_pos+rli->pending);
+ my_b_seek(cur_log,rli->event_relay_log_pos);
/* otherwise, we have had a partial read */
errmsg = "Aborting slave SQL thread because of partial event read";
break; // To end of function
@@ -3240,3 +3347,5 @@ err:
template class I_List_iterator<i_string>;
template class I_List_iterator<i_string_pair>;
#endif
+
+#endif /* HAVE_REPLICATION */