summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorsasha@mysql.sashanet.com <>2001-01-17 05:47:33 -0700
committersasha@mysql.sashanet.com <>2001-01-17 05:47:33 -0700
commitd0f4235a2efc4a31456c7ec6a6e4e32d830724bd (patch)
tree6d77ea3fb9cae4b29069ce7d9ad832a29cd4e0b3 /sql
parent1d25808fba138004572a8b11141540781300bd64 (diff)
downloadmariadb-git-d0f4235a2efc4a31456c7ec6a6e4e32d830724bd.tar.gz
rpl000016.test sync
rpl000001.result BitKeeper file /home/sasha/src/bk/mysql/mysql-test/r/rpl000001.result ignore Added BitKeeper/tmp/bkr3sAHD to the ignore list slave.h MASTER_POS_WAIT lex.h MASTER_POS_WAIT slave.cc MASTER_POS_WAIT, do automagic restart on debugging abort, skip rotate events in slave.cc debug abort count sql_repl.cc announce the log name at the start of the log with a fake rotate event item_create.h MASTER_POS_WAIT item_func.cc MASTER_POS_WAIT item_func.h MASTER_POS_WAIT sql_class.h enter_cond(), exit_cond() helper inliners item_create.cc added MASTER_POS_WAIT mysql-test-run.sh speed improvement fixes rpl000007.test sync rpl000003.test sleep -> sync rpl000004.test sleep -> sync, fixed clean up bug rpl000014.test sync rpl000009.test sync rpl000013.test sync rpl000001.test sleep -> sync rpl000008.test sync rpl000006.test sync on cleanup rpl000011.test sync rpl000012.test sync rpl000005.test sleep -> sync rpl000010.test sync rpl000015.test sync rpl000002.test sleep -> sync rpl000014.result we now know the master log name as soon as we connect mysql.cc added optional agrument to --wait mysqltest.c added save_master_pos and sync_with_master commands
Diffstat (limited to 'sql')
-rw-r--r--sql/item_create.cc5
-rw-r--r--sql/item_create.h1
-rw-r--r--sql/item_func.cc23
-rw-r--r--sql/item_func.h12
-rw-r--r--sql/lex.h2
-rw-r--r--sql/slave.cc58
-rw-r--r--sql/slave.h6
-rw-r--r--sql/sql_class.h19
-rw-r--r--sql/sql_repl.cc74
9 files changed, 170 insertions, 30 deletions
diff --git a/sql/item_create.cc b/sql/item_create.cc
index 8e6332d4e16..ef9f5f2d38b 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -376,3 +376,8 @@ Item *create_load_file(Item* a)
{
return new Item_load_file(a);
}
+
+Item *create_wait_for_master_pos(Item* a, Item* b)
+{
+ return new Item_master_pos_wait(a, b);
+}
diff --git a/sql/item_create.h b/sql/item_create.h
index de2726b32b0..cc7497b0183 100644
--- a/sql/item_create.h
+++ b/sql/item_create.h
@@ -85,3 +85,4 @@ Item *create_func_ucase(Item* a);
Item *create_func_version(void);
Item *create_func_weekday(Item* a);
Item *create_load_file(Item* a);
+Item *create_wait_for_master_pos(Item* a, Item* b);
diff --git a/sql/item_func.cc b/sql/item_func.cc
index bad789479b2..592ef9ae3d4 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -26,6 +26,7 @@
#include <hash.h>
#include <time.h>
#include <ft_global.h>
+#include "slave.h" // for wait_for_master_pos
/* return TRUE if item is a constant */
@@ -1388,6 +1389,28 @@ void item_user_lock_release(ULL *ull)
}
/*
+ Wait until we are at or past the given position in the master binlog
+ on the slave
+ */
+
+longlong Item_master_pos_wait::val_int()
+{
+ THD* thd = current_thd;
+ String *log_name = args[0]->val_str(&value);
+ int event_count;
+
+ if(thd->slave_thread || !log_name || !log_name->length())
+ {
+ null_value = 1;
+ return 0;
+ }
+ ulong pos = (ulong)args[1]->val_int();
+ if((event_count = glob_mi.wait_for_pos(thd, log_name, pos)) == -1)
+ null_value = 1;;
+ return event_count;
+}
+
+/*
Get a user level lock. If the thread has an old lock this is first released.
Returns 1: Got lock
Returns 0: Timeout
diff --git a/sql/item_func.h b/sql/item_func.h
index 90400b4181a..5810307e81c 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -778,6 +778,18 @@ class Item_func_release_lock :public Item_int_func
void fix_length_and_dec() { decimals=0; max_length=1; maybe_null=1;}
};
+/* replication functions */
+
+class Item_master_pos_wait :public Item_int_func
+{
+ String value;
+ public:
+ Item_master_pos_wait(Item *a,Item *b) :Item_int_func(a,b) {}
+ longlong val_int();
+ const char *func_name() const { return "master_pos_wait"; }
+ void fix_length_and_dec() { decimals=0; max_length=1; maybe_null=1;}
+};
+
/* Handling of user definiable variables */
diff --git a/sql/lex.h b/sql/lex.h
index 399106bfd77..bff45eb0f06 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -397,6 +397,8 @@ static SYMBOL sql_functions[] = {
{ "LOWER", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_lcase)},
{ "LPAD", SYM(FUNC_ARG3),0,CREATE_FUNC(create_func_lpad)},
{ "LTRIM", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_ltrim)},
+ { "MASTER_POS_WAIT", SYM(FUNC_ARG2),0,
+ CREATE_FUNC(create_wait_for_master_pos)},
{ "MAKE_SET", SYM(MAKE_SET_SYM),0,0},
{ "MAX", SYM(MAX_SYM),0,0},
{ "MD5", SYM(FUNC_ARG1),0,CREATE_FUNC(create_func_md5)},
diff --git a/sql/slave.cc b/sql/slave.cc
index 246dbdde93d..d27bb76c065 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -616,6 +616,46 @@ int flush_master_info(MASTER_INFO* mi)
return 0;
}
+int st_master_info::wait_for_pos(THD* thd, String* log_name, ulong log_pos)
+{
+ if(!inited) return -1;
+ bool pos_reached = 0;
+ int event_count = 0;
+ for(;!pos_reached && !thd->killed;)
+ {
+ int cmp_result;
+ char* basename;
+ pthread_mutex_lock(&lock);
+ if(*log_file_name)
+ {
+ basename = strrchr(log_file_name, FN_LIBCHAR);
+ if(basename)
+ ++basename;
+ else
+ basename = log_file_name;
+ cmp_result = strncmp(basename, log_name->ptr(),
+ log_name->length());
+ }
+ else
+ cmp_result = 0;
+
+ pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0);
+ if(!pos_reached && !thd->killed)
+ {
+ const char* msg = thd->enter_cond(&cond, &lock,
+ "Waiting for master update");
+ pthread_cond_wait(&cond, &lock);
+ thd->exit_cond(msg);
+ event_count++;
+ }
+ pthread_mutex_unlock(&lock);
+ if(thd->killed)
+ return -1;
+ }
+
+ return event_count;
+}
+
static int init_slave_thread(THD* thd)
{
@@ -1003,10 +1043,17 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
Rotate_log_event* rev = (Rotate_log_event*)ev;
int ident_len = rev->ident_len;
+ pthread_mutex_lock(&mi->lock);
memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
mi->log_file_name[ident_len] = 0;
mi->pos = 4; // skip magic number
+ pthread_cond_broadcast(&mi->cond);
+ pthread_mutex_unlock(&mi->lock);
flush_master_info(mi);
+#ifndef DBUG_OFF
+ if(abort_slave_event_count)
+ ++events_till_abort;
+#endif
delete ev;
break;
}
@@ -1045,6 +1092,9 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
pthread_handler_decl(handle_slave,arg __attribute__((unused)))
{
+#ifndef DBUG_OFF
+ slave_begin:
+#endif
THD *thd; // needs to be first for thread_stack
MYSQL *mysql = NULL ;
@@ -1068,8 +1118,8 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
#ifndef DBUG_OFF
events_till_abort = abort_slave_event_count;
#endif
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
+ pthread_cond_broadcast(&COND_slave_start);
+ pthread_mutex_unlock(&LOCK_slave);
int error = 1;
bool retried_once = 0;
@@ -1241,6 +1291,10 @@ position %ld",
net_end(&thd->net); // destructor will not free it, because we are weird
delete thd;
my_thread_end();
+#ifndef DBUG_OFF
+ if(abort_slave_event_count && !events_till_abort)
+ goto slave_begin;
+#endif
pthread_exit(0);
DBUG_RETURN(0); // Can't return anything here
}
diff --git a/sql/slave.h b/sql/slave.h
index c26be966fc3..b7e2d783749 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -14,17 +14,20 @@ typedef struct st_master_info
uint port;
uint connect_retry;
pthread_mutex_t lock;
+ pthread_cond_t cond;
bool inited;
st_master_info():pending(0),fd(-1),inited(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
pthread_mutex_init(&lock, NULL);
+ pthread_cond_init(&cond, NULL);
}
~st_master_info()
{
pthread_mutex_destroy(&lock);
+ pthread_cond_destroy(&cond);
}
inline void inc_pending(ulonglong val)
{
@@ -35,6 +38,7 @@ typedef struct st_master_info
pthread_mutex_lock(&lock);
pos += val + pending;
pending = 0;
+ pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);
}
// thread safe read of position - not needed if we are in the slave thread,
@@ -45,6 +49,8 @@ typedef struct st_master_info
var = pos;
pthread_mutex_unlock(&lock);
}
+
+ int wait_for_pos(THD* thd, String* log_name, ulong log_pos);
} MASTER_INFO;
typedef struct st_table_rule_ent
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 46c37e1b57f..ef42eaf594c 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -281,6 +281,25 @@ public:
THD();
~THD();
bool store_globals();
+ inline const char* enter_cond(pthread_cond_t *cond, pthread_mutex_t* mutex,
+ const char* msg)
+ {
+ const char* old_msg = proc_info;
+ pthread_mutex_lock(&mysys_var->mutex);
+ mysys_var->current_mutex = mutex;
+ mysys_var->current_cond = cond;
+ proc_info = msg;
+ pthread_mutex_unlock(&mysys_var->mutex);
+ return old_msg;
+ }
+ inline void exit_cond(const char* old_msg)
+ {
+ pthread_mutex_lock(&mysys_var->mutex);
+ mysys_var->current_mutex = 0;
+ mysys_var->current_cond = 0;
+ proc_info = old_msg;
+ pthread_mutex_unlock(&mysys_var->mutex);
+ }
inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time() { if (user_time) start_time=time_after_lock=user_time; else time_after_lock=time(&start_time); }
inline void end_time() { time(&start_time); }
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 0aaaf027ea3..5e37d590c47 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -26,6 +26,36 @@
extern const char* any_db;
extern pthread_handler_decl(handle_slave,arg);
+static int fake_rotate_event(NET* net, String* packet,
+ const char* log_file_name);
+
+static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
+ const char**errmsg)
+{
+ char header[LOG_EVENT_HEADER_LEN];
+ memset(header, 0, 4); // when does not matter
+ header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
+ char* p = strrchr(log_file_name, FN_LIBCHAR);
+ // find the last slash
+ if(p)
+ p++;
+ else
+ p = log_file_name;
+
+ uint ident_len = (uint) strlen(p);
+ ulong event_len = ident_len + sizeof(header);
+ int4store(header + EVENT_TYPE_OFFSET + 1, server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ packet->append(header, sizeof(header));
+ packet->append(p,ident_len);
+ if(my_net_write(net, (char*)packet->ptr(), packet->length()))
+ {
+ *errmsg = "failed on my_net_write()";
+ return -1;
+ }
+ return 0;
+}
+
static int send_file(THD *thd)
{
@@ -281,6 +311,15 @@ sweepstakes if you report the bug";
// we need to start a packet with something other than 255
// to distiquish it from error
+ if(pos == 4) // tell the client log name with a fake rotate_event
+ // if we are at the start of the log
+ {
+ if(fake_rotate_event(net, packet, log_file_name, &errmsg))
+ goto err;
+ packet->length(0);
+ packet->append("\0", 1);
+ }
+
while(!net->error && net->vio != 0 && !thd->killed)
{
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
@@ -437,36 +476,15 @@ sweepstakes if you report the bug";
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
- if ((file=open_log(&log, log_file_name, &errmsg)) < 0)
- goto err;
-
+
// fake Rotate_log event just in case it did not make it to the log
// otherwise the slave make get confused about the offset
- {
- char header[LOG_EVENT_HEADER_LEN];
- memset(header, 0, 4); // when does not matter
- header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
- char* p = strrchr(log_file_name, FN_LIBCHAR);
- // find the last slash
- if(p)
- p++;
- else
- p = log_file_name;
-
- uint ident_len = (uint) strlen(p);
- ulong event_len = ident_len + sizeof(header);
- int4store(header + EVENT_TYPE_OFFSET + 1, server_id);
- int4store(header + EVENT_LEN_OFFSET, event_len);
- packet->append(header, sizeof(header));
- packet->append(p,ident_len);
- if(my_net_write(net, (char*)packet->ptr(), packet->length()))
- {
- errmsg = "failed on my_net_write()";
- goto err;
- }
- packet->length(0);
- packet->append("\0",1);
- }
+ if ((file=open_log(&log, log_file_name, &errmsg)) < 0 ||
+ fake_rotate_event(net, packet, log_file_name, &errmsg))
+ goto err;
+
+ packet->length(0);
+ packet->append("\0",1);
}
}