summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2002-01-19 19:16:52 -0700
committerunknown <sasha@mysql.sashanet.com>2002-01-19 19:16:52 -0700
commit6291b7b591b0d5239dfac890209fe30dfc3dc8f5 (patch)
tree87da2fd65f79c28f4b97c4619f95b07797107d82 /sql/log_event.cc
parentdf62018f540c9028fd43050650436431636a710f (diff)
downloadmariadb-git-6291b7b591b0d5239dfac890209fe30dfc3dc8f5.tar.gz
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work. The main change is the new replication code - now we have two slave threads SQL thread and I/O thread. I have also re-written a lot of the code to prepare for multi-master implementation. I also documented IO_CACHE quite extensively and to some extend, THD class. Makefile.am: moved tags target script into a separate file include/my_sys.h: fixes in IO_CACHE for SEQ_READ_APPEND + some documentation libmysqld/lib_sql.cc: updated replication locks, but now I see I did it wrong and it won't compile. Will fix before the push. mysql-test/r/rpl000014.result: test result update mysql-test/r/rpl000015.result: test result update mysql-test/r/rpl000016.result: test result update mysql-test/r/rpl_log.result: test result update mysql-test/t/rpl000016-slave.sh: remove relay logs mysql-test/t/rpl000017-slave.sh: remove relay logs mysql-test/t/rpl_log.test: updated test mysys/mf_iocache.c: IO_CACHE updates to make replication work mysys/mf_iocache2.c: IO_CACHE update to make replication work mysys/thr_mutex.c: cosmetic change sql/item_func.cc: new replication code sql/lex.h: new replication sql/log.cc: new replication sql/log_event.cc: new replication sql/log_event.h: new replication sql/mini_client.cc: new replication sql/mini_client.h: new replication sql/mysql_priv.h: new replication sql/mysqld.cc: new replication sql/repl_failsafe.cc: new replication sql/slave.cc: new replication sql/slave.h: new replication sql/sql_class.cc: new replication sql/sql_class.h: new replication sql/sql_lex.h: new replication sql/sql_parse.cc: new replication sql/sql_repl.cc: new replication sql/sql_repl.h: new replication sql/sql_show.cc: new replication sql/sql_yacc.yy: new replication sql/stacktrace.c: more robust stack tracing sql/structs.h: new replication code BitKeeper/etc/ignore: Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc194
1 files changed, 105 insertions, 89 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 72198038a07..7eb7c57ae40 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -24,6 +24,8 @@
#include <my_dir.h>
#endif /* MYSQL_CLIENT */
+#include <assert.h>
+
#ifdef MYSQL_CLIENT
static void pretty_print_str(FILE* file, char* str, int len)
{
@@ -118,14 +120,14 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg):
if (thd)
{
server_id = thd->server_id;
- log_seq = thd->log_seq;
when = thd->start_time;
+ log_pos = thd->log_pos;
}
else
{
server_id = ::server_id;
- log_seq = 0;
when = time(NULL);
+ log_pos=0;
}
}
@@ -156,12 +158,12 @@ Log_event::Log_event(const char* buf, bool old_format):
server_id = uint4korr(buf + SERVER_ID_OFFSET);
if (old_format)
{
- log_seq=0;
+ log_pos=0;
flags=0;
}
else
{
- log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
+ log_pos = uint4korr(buf + LOG_POS_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
}
#ifndef MYSQL_CLIENT
@@ -172,13 +174,13 @@ Log_event::Log_event(const char* buf, bool old_format):
#ifndef MYSQL_CLIENT
-int Log_event::exec_event(struct st_master_info* mi)
+int Log_event::exec_event(struct st_relay_log_info* rli)
{
- if (mi)
+ if (rli)
{
- thd->log_seq = 0;
- mi->inc_pos(get_event_len(), log_seq);
- flush_master_info(mi);
+ rli->inc_pos(get_event_len(),log_pos);
+ DBUG_ASSERT(rli->sql_thd != 0);
+ flush_relay_log_info(rli);
}
return 0;
}
@@ -193,14 +195,14 @@ void Query_log_event::pack_info(String* packet)
char buf[256];
String tmp(buf, sizeof(buf));
tmp.length(0);
- if(db && db_len)
+ if (db && db_len)
{
tmp.append("use ");
tmp.append(db, db_len);
tmp.append("; ", 2);
}
- if(query && q_len)
+ if (query && q_len)
tmp.append(query, q_len);
net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}
@@ -345,7 +347,7 @@ void Log_event::init_show_field_list(List<Item>* field_list)
field_list->push_back(new Item_empty_string("Pos", 20));
field_list->push_back(new Item_empty_string("Event_type", 20));
field_list->push_back(new Item_empty_string("Server_id", 20));
- field_list->push_back(new Item_empty_string("Log_seq", 20));
+ field_list->push_back(new Item_empty_string("Orig_log_pos", 20));
field_list->push_back(new Item_empty_string("Info", 20));
}
@@ -363,7 +365,7 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos)
event_type = get_type_str();
net_store_data(packet, event_type, strlen(event_type));
net_store_data(packet, server_id);
- net_store_data(packet, log_seq);
+ net_store_data(packet, log_pos);
pack_info(packet);
return my_net_write(&thd->net, (char*)packet->ptr(), packet->length());
}
@@ -392,7 +394,7 @@ int Log_event::write_header(IO_CACHE* file)
long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
int4store(pos, tmp);
pos += 4;
- int4store(pos, log_seq);
+ int4store(pos, log_pos);
pos += 4;
int2store(pos, flags);
pos += 2;
@@ -456,7 +458,6 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#define LOCK_MUTEX
#endif
-
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file,
@@ -501,7 +502,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
}
buf[data_len] = 0;
memcpy(buf, head, header_size);
- if(my_b_read(file, (byte*) buf + header_size,
+ if (my_b_read(file, (byte*) buf + header_size,
data_len - header_size))
{
error = "read error";
@@ -511,9 +512,10 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
res->register_temp_buf(buf);
err:
UNLOCK_MUTEX;
- if(error)
+ if (error)
{
- sql_print_error(error);
+ sql_print_error("Error in Log_event::read_log_event(): '%s', \
+data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]);
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
}
return res;
@@ -581,9 +583,11 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
#ifdef MYSQL_CLIENT
void Log_event::print_header(FILE* file)
{
+ char llbuff[22];
fputc('#', file);
print_timestamp(file);
- fprintf(file, " server id %d ", server_id);
+ fprintf(file, " server id %d log_pos %s ", server_id,
+ llstr(log_pos,llbuff));
}
void Log_event::print_timestamp(FILE* file, time_t* ts)
@@ -1187,12 +1191,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
#ifndef MYSQL_CLIENT
-void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log)
+void Log_event::set_log_pos(MYSQL_LOG* log)
{
- log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++;
+ if (!log_pos)
+ log_pos = my_b_tell(&log->log_file);
}
-
void Load_log_event::set_fields(List<Item> &fields)
{
uint i;
@@ -1205,14 +1209,20 @@ void Load_log_event::set_fields(List<Item> &fields)
}
-Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi):
+Slave_log_event::Slave_log_event(THD* thd_arg,
+ struct st_relay_log_info* rli):
Log_event(thd_arg),mem_pool(0),master_host(0)
{
- if(!mi->inited)
+ if(!rli->inited)
return;
- pthread_mutex_lock(&mi->lock);
+
+ MASTER_INFO* mi = rli->mi;
+ // TODO: re-write this better without holding both
+ // locks at the same time
+ pthread_mutex_lock(&mi->data_lock);
+ pthread_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host);
- master_log_len = strlen(mi->log_file_name);
+ master_log_len = strlen(rli->master_log_name);
// on OOM, just do not initialize the structure and print the error
if((mem_pool = (char*)my_malloc(get_data_size() + 1,
MYF(MY_WME))))
@@ -1220,13 +1230,14 @@ Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi):
master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
memcpy(master_host, mi->host, master_host_len + 1);
master_log = master_host + master_host_len + 1;
- memcpy(master_log, mi->log_file_name, master_log_len + 1);
+ memcpy(master_log, rli->master_log_name, master_log_len + 1);
master_port = mi->port;
- master_pos = mi->pos;
+ master_pos = rli->master_log_pos;
}
else
sql_print_error("Out of memory while recording slave event");
- pthread_mutex_unlock(&mi->lock);
+ pthread_mutex_unlock(&rli->data_lock);
+ pthread_mutex_unlock(&mi->data_lock);
}
@@ -1533,7 +1544,7 @@ void Execute_load_log_event::pack_info(String* packet)
#endif
#ifndef MYSQL_CLIENT
-int Query_log_event::exec_event(struct st_master_info* mi)
+int Query_log_event::exec_event(struct st_relay_log_info* rli)
{
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
@@ -1553,7 +1564,7 @@ int Query_log_event::exec_event(struct st_master_info* mi)
// sanity check to make sure the master did not get a really bad
// error on the query
- if (!check_expected_error(thd, (expected_error = error_code)))
+ if (!check_expected_error(thd,rli,(expected_error = error_code)))
{
mysql_parse(thd, thd->query, q_len);
if (expected_error !=
@@ -1570,8 +1581,8 @@ int Query_log_event::exec_event(struct st_master_info* mi)
else if (expected_error == actual_error)
{
thd->query_error = 0;
- *last_slave_error = 0;
- last_slave_errno = 0;
+ *rli->last_slave_error = 0;
+ rli->last_slave_errno = 0;
}
}
else
@@ -1592,17 +1603,17 @@ int Query_log_event::exec_event(struct st_master_info* mi)
if (thd->query_error || thd->fatal_error)
{
- slave_print_error(actual_error, "error '%s' on query '%s'",
+ slave_print_error(rli,actual_error, "error '%s' on query '%s'",
actual_error ? thd->net.last_error :
"unexpected success or fatal error", query);
free_root(&thd->mem_root,0);
return 1;
}
free_root(&thd->mem_root,0);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
+int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
{
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db);
@@ -1625,6 +1636,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
// the table will be opened in mysql_load
if(table_rules_on && !tables_ok(thd, &tables))
{
+ // TODO: this is a bug - this needs to be moved to the I/O thread
if (net)
skip_load_data_infile(net);
}
@@ -1632,7 +1644,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{
char llbuff[22];
enum enum_duplicates handle_dup = DUP_IGNORE;
- if(sql_ex.opt_flags && REPLACE_FLAG)
+ if (sql_ex.opt_flags && REPLACE_FLAG)
handle_dup = DUP_REPLACE;
sql_exchange ex((char*)fname, sql_ex.opt_flags &&
DUMPFILE_FLAG );
@@ -1663,7 +1675,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
thd->query_error = 1;
if(thd->cuted_fields)
sql_print_error("Slave: load data infile at position %s in log \
-'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME,
+'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME,
thd->cuted_fields );
if(net)
net->pkt_nr = thd->net.pkt_nr;
@@ -1673,6 +1685,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{
// we will just ask the master to send us /dev/null if we do not
// want to load the data
+ // TODO: this a bug - needs to be done in I/O thread
if (net)
skip_load_data_infile(net);
}
@@ -1683,10 +1696,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
if(thd->query_error)
{
int sql_error = thd->net.last_errno;
- if(!sql_error)
+ if (!sql_error)
sql_error = ER_UNKNOWN_ERROR;
- slave_print_error(sql_error, "Slave: Error '%s' running load data infile ",
+ slave_print_error(rli,sql_error,
+ "Slave: Error '%s' running load data infile ",
ER_SAFE(sql_error));
free_root(&thd->mem_root,0);
return 1;
@@ -1699,38 +1713,43 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
return 1;
}
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Start_log_event::exec_event(struct st_master_info* mi)
+int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
- if (!mi->old_format)
+ if (!rli->mi->old_format)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
}
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Stop_log_event::exec_event(struct st_master_info* mi)
+int Stop_log_event::exec_event(struct st_relay_log_info* rli)
{
- if (mi->pos > 4) // stop event should be ignored after rotate event
+ // do not clean up immediately after rotate event
+ if (rli->master_log_pos > 4)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
- mi->inc_pos(get_event_len(), log_seq);
- flush_master_info(mi);
}
- thd->log_seq = 0;
+ // we do not want to update master_log pos because we get a rotate event
+ // before stop, so by now master_log_name is set to the next log
+ // if we updated it, we will have incorrect master coordinates and this
+ // could give false triggers in MASTER_POS_WAIT() that we have reached
+ // the targed position when in fact we have not
+ rli->inc_pos(get_event_len(), 0);
+ flush_relay_log_info(rli);
return 0;
}
-int Rotate_log_event::exec_event(struct st_master_info* mi)
+int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
{
bool rotate_binlog = 0, write_slave_event = 0;
- char* log_name = mi->log_file_name;
- pthread_mutex_lock(&mi->lock);
-
+ char* log_name = rli->master_log_name;
+ pthread_mutex_lock(&rli->data_lock);
+ // TODO: probably needs re-write
// rotate local binlog only if the name of remote has changed
if (!*log_name || !(log_name[ident_len] == 0 &&
!memcmp(log_name, new_log_ident, ident_len)))
@@ -1738,41 +1757,38 @@ int Rotate_log_event::exec_event(struct st_master_info* mi)
write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F)
&& mysql_bin_log.is_open());
rotate_binlog = (*log_name && write_slave_event);
- memcpy(log_name, new_log_ident,ident_len );
+ if (ident_len >= sizeof(rli->master_log_name))
+ return 1;
+ memcpy(log_name, new_log_ident,ident_len);
log_name[ident_len] = 0;
}
- mi->pos = pos;
- mi->last_log_seq = log_seq;
-#ifndef DBUG_OFF
- if (abort_slave_event_count)
- ++events_till_abort;
-#endif
+ rli->master_log_pos = pos;
+ rli->relay_log_pos += get_event_len();
if (rotate_binlog)
{
mysql_bin_log.new_file();
- mi->last_log_seq = 0;
+ rli->master_log_pos = 4;
}
- pthread_cond_broadcast(&mi->cond);
- pthread_mutex_unlock(&mi->lock);
- flush_master_info(mi);
+ pthread_cond_broadcast(&rli->data_cond);
+ pthread_mutex_unlock(&rli->data_lock);
+ flush_relay_log_info(rli);
if (write_slave_event)
{
- Slave_log_event s(thd, mi);
+ Slave_log_event s(thd, rli);
if (s.master_host)
{
- s.set_log_seq(0, &mysql_bin_log);
+ s.set_log_pos(&mysql_bin_log);
s.server_id = ::server_id;
mysql_bin_log.write(&s);
}
}
- thd->log_seq = 0;
return 0;
}
-int Intvar_log_event::exec_event(struct st_master_info* mi)
+int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
{
- switch(type)
+ switch (type)
{
case LAST_INSERT_ID_EVENT:
thd->last_insert_id_used = 1;
@@ -1782,18 +1798,18 @@ int Intvar_log_event::exec_event(struct st_master_info* mi)
thd->next_insert_id = val;
break;
}
- mi->inc_pending(get_event_len());
+ rli->inc_pending(get_event_len());
return 0;
}
-int Slave_log_event::exec_event(struct st_master_info* mi)
+int Slave_log_event::exec_event(struct st_relay_log_info* rli)
{
if(mysql_bin_log.is_open())
mysql_bin_log.write(this);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Create_file_log_event::exec_event(struct st_master_info* mi)
+int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname_buf[FN_REFLEN+10];
char *p;
@@ -1809,7 +1825,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
- slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
@@ -1820,7 +1836,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
if (write_base(&file))
{
strmov(p, ".info"); // to have it right in the error message
- slave_print_error(my_errno, "Could not write to file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf);
goto err;
}
end_io_cache(&file);
@@ -1830,12 +1846,12 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
MYF(MY_WME))) < 0)
{
- slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(my_errno, "Write to '%s' failed", fname_buf);
+ slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf);
goto err;
}
if (mysql_bin_log.is_open())
@@ -1846,10 +1862,10 @@ err:
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? 1 : Log_event::exec_event(mi);
+ return error ? 1 : Log_event::exec_event(rli);
}
-int Delete_file_log_event::exec_event(struct st_master_info* mi)
+int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1860,10 +1876,10 @@ int Delete_file_log_event::exec_event(struct st_master_info* mi)
(void)my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Append_block_log_event::exec_event(struct st_master_info* mi)
+int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1873,12 +1889,12 @@ int Append_block_log_event::exec_event(struct st_master_info* mi)
memcpy(p, ".data", 6);
if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
{
- slave_print_error(my_errno, "Could not open file '%s'", fname);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(my_errno, "Write to '%s' failed", fname);
+ slave_print_error(rli,my_errno, "Write to '%s' failed", fname);
goto err;
}
if (mysql_bin_log.is_open())
@@ -1887,10 +1903,10 @@ int Append_block_log_event::exec_event(struct st_master_info* mi)
err:
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? error : Log_event::exec_event(mi);
+ return error ? error : Log_event::exec_event(rli);
}
-int Execute_load_log_event::exec_event(struct st_master_info* mi)
+int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1906,7 +1922,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
- slave_print_error(my_errno, "Could not open file '%s'", fname);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
@@ -1914,7 +1930,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
(bool)0))
|| lev->get_type_code() != NEW_LOAD_EVENT)
{
- slave_print_error(0, "File '%s' appears corrupted", fname);
+ slave_print_error(rli,0, "File '%s' appears corrupted", fname);
goto err;
}
// we want to disable binary logging in slave thread
@@ -1927,7 +1943,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
lev->thd = thd;
if (lev->exec_event(0,0))
{
- slave_print_error(my_errno, "Failed executing load from '%s'", fname);
+ slave_print_error(rli,my_errno, "Failed executing load from '%s'", fname);
thd->options = save_options;
goto err;
}
@@ -1943,7 +1959,7 @@ err:
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? error : Log_event::exec_event(mi);
+ return error ? error : Log_event::exec_event(rli);
}