diff options
author | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
commit | 6291b7b591b0d5239dfac890209fe30dfc3dc8f5 (patch) | |
tree | 87da2fd65f79c28f4b97c4619f95b07797107d82 /sql/log_event.cc | |
parent | df62018f540c9028fd43050650436431636a710f (diff) | |
download | mariadb-git-6291b7b591b0d5239dfac890209fe30dfc3dc8f5.tar.gz |
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work.
The main change is the new replication code - now we have two slave threads
SQL thread and I/O thread. I have also re-written a lot of the code to
prepare for multi-master implementation.
I also documented IO_CACHE quite extensively and to some extend, THD class.
Makefile.am:
moved tags target script into a separate file
include/my_sys.h:
fixes in IO_CACHE for SEQ_READ_APPEND + some documentation
libmysqld/lib_sql.cc:
updated replication locks, but now I see I did it wrong and it won't compile. Will fix
before the push.
mysql-test/r/rpl000014.result:
test result update
mysql-test/r/rpl000015.result:
test result update
mysql-test/r/rpl000016.result:
test result update
mysql-test/r/rpl_log.result:
test result update
mysql-test/t/rpl000016-slave.sh:
remove relay logs
mysql-test/t/rpl000017-slave.sh:
remove relay logs
mysql-test/t/rpl_log.test:
updated test
mysys/mf_iocache.c:
IO_CACHE updates to make replication work
mysys/mf_iocache2.c:
IO_CACHE update to make replication work
mysys/thr_mutex.c:
cosmetic change
sql/item_func.cc:
new replication code
sql/lex.h:
new replication
sql/log.cc:
new replication
sql/log_event.cc:
new replication
sql/log_event.h:
new replication
sql/mini_client.cc:
new replication
sql/mini_client.h:
new replication
sql/mysql_priv.h:
new replication
sql/mysqld.cc:
new replication
sql/repl_failsafe.cc:
new replication
sql/slave.cc:
new replication
sql/slave.h:
new replication
sql/sql_class.cc:
new replication
sql/sql_class.h:
new replication
sql/sql_lex.h:
new replication
sql/sql_parse.cc:
new replication
sql/sql_repl.cc:
new replication
sql/sql_repl.h:
new replication
sql/sql_show.cc:
new replication
sql/sql_yacc.yy:
new replication
sql/stacktrace.c:
more robust stack tracing
sql/structs.h:
new replication code
BitKeeper/etc/ignore:
Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 194 |
1 files changed, 105 insertions, 89 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 72198038a07..7eb7c57ae40 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -24,6 +24,8 @@ #include <my_dir.h> #endif /* MYSQL_CLIENT */ +#include <assert.h> + #ifdef MYSQL_CLIENT static void pretty_print_str(FILE* file, char* str, int len) { @@ -118,14 +120,14 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg): if (thd) { server_id = thd->server_id; - log_seq = thd->log_seq; when = thd->start_time; + log_pos = thd->log_pos; } else { server_id = ::server_id; - log_seq = 0; when = time(NULL); + log_pos=0; } } @@ -156,12 +158,12 @@ Log_event::Log_event(const char* buf, bool old_format): server_id = uint4korr(buf + SERVER_ID_OFFSET); if (old_format) { - log_seq=0; + log_pos=0; flags=0; } else { - log_seq = uint4korr(buf + LOG_SEQ_OFFSET); + log_pos = uint4korr(buf + LOG_POS_OFFSET); flags = uint2korr(buf + FLAGS_OFFSET); } #ifndef MYSQL_CLIENT @@ -172,13 +174,13 @@ Log_event::Log_event(const char* buf, bool old_format): #ifndef MYSQL_CLIENT -int Log_event::exec_event(struct st_master_info* mi) +int Log_event::exec_event(struct st_relay_log_info* rli) { - if (mi) + if (rli) { - thd->log_seq = 0; - mi->inc_pos(get_event_len(), log_seq); - flush_master_info(mi); + rli->inc_pos(get_event_len(),log_pos); + DBUG_ASSERT(rli->sql_thd != 0); + flush_relay_log_info(rli); } return 0; } @@ -193,14 +195,14 @@ void Query_log_event::pack_info(String* packet) char buf[256]; String tmp(buf, sizeof(buf)); tmp.length(0); - if(db && db_len) + if (db && db_len) { tmp.append("use "); tmp.append(db, db_len); tmp.append("; ", 2); } - if(query && q_len) + if (query && q_len) tmp.append(query, q_len); net_store_data(packet, (char*)tmp.ptr(), tmp.length()); } @@ -345,7 +347,7 @@ void Log_event::init_show_field_list(List<Item>* field_list) field_list->push_back(new Item_empty_string("Pos", 20)); field_list->push_back(new Item_empty_string("Event_type", 20)); field_list->push_back(new Item_empty_string("Server_id", 20)); - field_list->push_back(new Item_empty_string("Log_seq", 20)); + field_list->push_back(new Item_empty_string("Orig_log_pos", 20)); field_list->push_back(new Item_empty_string("Info", 20)); } @@ -363,7 +365,7 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos) event_type = get_type_str(); net_store_data(packet, event_type, strlen(event_type)); net_store_data(packet, server_id); - net_store_data(packet, log_seq); + net_store_data(packet, log_pos); pack_info(packet); return my_net_write(&thd->net, (char*)packet->ptr(), packet->length()); } @@ -392,7 +394,7 @@ int Log_event::write_header(IO_CACHE* file) long tmp=get_data_size() + LOG_EVENT_HEADER_LEN; int4store(pos, tmp); pos += 4; - int4store(pos, log_seq); + int4store(pos, log_pos); pos += 4; int2store(pos, flags); pos += 2; @@ -456,7 +458,6 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, #define LOCK_MUTEX #endif - // allocates memory - the caller is responsible for clean-up #ifndef MYSQL_CLIENT Log_event* Log_event::read_log_event(IO_CACHE* file, @@ -501,7 +502,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) } buf[data_len] = 0; memcpy(buf, head, header_size); - if(my_b_read(file, (byte*) buf + header_size, + if (my_b_read(file, (byte*) buf + header_size, data_len - header_size)) { error = "read error"; @@ -511,9 +512,10 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) res->register_temp_buf(buf); err: UNLOCK_MUTEX; - if(error) + if (error) { - sql_print_error(error); + sql_print_error("Error in Log_event::read_log_event(): '%s', \ +data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]); my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); } return res; @@ -581,9 +583,11 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, #ifdef MYSQL_CLIENT void Log_event::print_header(FILE* file) { + char llbuff[22]; fputc('#', file); print_timestamp(file); - fprintf(file, " server id %d ", server_id); + fprintf(file, " server id %d log_pos %s ", server_id, + llstr(log_pos,llbuff)); } void Log_event::print_timestamp(FILE* file, time_t* ts) @@ -1187,12 +1191,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) #ifndef MYSQL_CLIENT -void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log) +void Log_event::set_log_pos(MYSQL_LOG* log) { - log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++; + if (!log_pos) + log_pos = my_b_tell(&log->log_file); } - void Load_log_event::set_fields(List<Item> &fields) { uint i; @@ -1205,14 +1209,20 @@ void Load_log_event::set_fields(List<Item> &fields) } -Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi): +Slave_log_event::Slave_log_event(THD* thd_arg, + struct st_relay_log_info* rli): Log_event(thd_arg),mem_pool(0),master_host(0) { - if(!mi->inited) + if(!rli->inited) return; - pthread_mutex_lock(&mi->lock); + + MASTER_INFO* mi = rli->mi; + // TODO: re-write this better without holding both + // locks at the same time + pthread_mutex_lock(&mi->data_lock); + pthread_mutex_lock(&rli->data_lock); master_host_len = strlen(mi->host); - master_log_len = strlen(mi->log_file_name); + master_log_len = strlen(rli->master_log_name); // on OOM, just do not initialize the structure and print the error if((mem_pool = (char*)my_malloc(get_data_size() + 1, MYF(MY_WME)))) @@ -1220,13 +1230,14 @@ Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi): master_host = mem_pool + SL_MASTER_HOST_OFFSET ; memcpy(master_host, mi->host, master_host_len + 1); master_log = master_host + master_host_len + 1; - memcpy(master_log, mi->log_file_name, master_log_len + 1); + memcpy(master_log, rli->master_log_name, master_log_len + 1); master_port = mi->port; - master_pos = mi->pos; + master_pos = rli->master_log_pos; } else sql_print_error("Out of memory while recording slave event"); - pthread_mutex_unlock(&mi->lock); + pthread_mutex_unlock(&rli->data_lock); + pthread_mutex_unlock(&mi->data_lock); } @@ -1533,7 +1544,7 @@ void Execute_load_log_event::pack_info(String* packet) #endif #ifndef MYSQL_CLIENT -int Query_log_event::exec_event(struct st_master_info* mi) +int Query_log_event::exec_event(struct st_relay_log_info* rli) { int expected_error,actual_error = 0; init_sql_alloc(&thd->mem_root, 8192,0); @@ -1553,7 +1564,7 @@ int Query_log_event::exec_event(struct st_master_info* mi) // sanity check to make sure the master did not get a really bad // error on the query - if (!check_expected_error(thd, (expected_error = error_code))) + if (!check_expected_error(thd,rli,(expected_error = error_code))) { mysql_parse(thd, thd->query, q_len); if (expected_error != @@ -1570,8 +1581,8 @@ int Query_log_event::exec_event(struct st_master_info* mi) else if (expected_error == actual_error) { thd->query_error = 0; - *last_slave_error = 0; - last_slave_errno = 0; + *rli->last_slave_error = 0; + rli->last_slave_errno = 0; } } else @@ -1592,17 +1603,17 @@ int Query_log_event::exec_event(struct st_master_info* mi) if (thd->query_error || thd->fatal_error) { - slave_print_error(actual_error, "error '%s' on query '%s'", + slave_print_error(rli,actual_error, "error '%s' on query '%s'", actual_error ? thd->net.last_error : "unexpected success or fatal error", query); free_root(&thd->mem_root,0); return 1; } free_root(&thd->mem_root,0); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Load_log_event::exec_event(NET* net, struct st_master_info* mi) +int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) { init_sql_alloc(&thd->mem_root, 8192,0); thd->db = rewrite_db((char*)db); @@ -1625,6 +1636,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) // the table will be opened in mysql_load if(table_rules_on && !tables_ok(thd, &tables)) { + // TODO: this is a bug - this needs to be moved to the I/O thread if (net) skip_load_data_infile(net); } @@ -1632,7 +1644,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) { char llbuff[22]; enum enum_duplicates handle_dup = DUP_IGNORE; - if(sql_ex.opt_flags && REPLACE_FLAG) + if (sql_ex.opt_flags && REPLACE_FLAG) handle_dup = DUP_REPLACE; sql_exchange ex((char*)fname, sql_ex.opt_flags && DUMPFILE_FLAG ); @@ -1663,7 +1675,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) thd->query_error = 1; if(thd->cuted_fields) sql_print_error("Slave: load data infile at position %s in log \ -'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME, +'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME, thd->cuted_fields ); if(net) net->pkt_nr = thd->net.pkt_nr; @@ -1673,6 +1685,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) { // we will just ask the master to send us /dev/null if we do not // want to load the data + // TODO: this a bug - needs to be done in I/O thread if (net) skip_load_data_infile(net); } @@ -1683,10 +1696,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) if(thd->query_error) { int sql_error = thd->net.last_errno; - if(!sql_error) + if (!sql_error) sql_error = ER_UNKNOWN_ERROR; - slave_print_error(sql_error, "Slave: Error '%s' running load data infile ", + slave_print_error(rli,sql_error, + "Slave: Error '%s' running load data infile ", ER_SAFE(sql_error)); free_root(&thd->mem_root,0); return 1; @@ -1699,38 +1713,43 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) return 1; } - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Start_log_event::exec_event(struct st_master_info* mi) +int Start_log_event::exec_event(struct st_relay_log_info* rli) { - if (!mi->old_format) + if (!rli->mi->old_format) { close_temporary_tables(thd); cleanup_load_tmpdir(); } - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Stop_log_event::exec_event(struct st_master_info* mi) +int Stop_log_event::exec_event(struct st_relay_log_info* rli) { - if (mi->pos > 4) // stop event should be ignored after rotate event + // do not clean up immediately after rotate event + if (rli->master_log_pos > 4) { close_temporary_tables(thd); cleanup_load_tmpdir(); - mi->inc_pos(get_event_len(), log_seq); - flush_master_info(mi); } - thd->log_seq = 0; + // we do not want to update master_log pos because we get a rotate event + // before stop, so by now master_log_name is set to the next log + // if we updated it, we will have incorrect master coordinates and this + // could give false triggers in MASTER_POS_WAIT() that we have reached + // the targed position when in fact we have not + rli->inc_pos(get_event_len(), 0); + flush_relay_log_info(rli); return 0; } -int Rotate_log_event::exec_event(struct st_master_info* mi) +int Rotate_log_event::exec_event(struct st_relay_log_info* rli) { bool rotate_binlog = 0, write_slave_event = 0; - char* log_name = mi->log_file_name; - pthread_mutex_lock(&mi->lock); - + char* log_name = rli->master_log_name; + pthread_mutex_lock(&rli->data_lock); + // TODO: probably needs re-write // rotate local binlog only if the name of remote has changed if (!*log_name || !(log_name[ident_len] == 0 && !memcmp(log_name, new_log_ident, ident_len))) @@ -1738,41 +1757,38 @@ int Rotate_log_event::exec_event(struct st_master_info* mi) write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F) && mysql_bin_log.is_open()); rotate_binlog = (*log_name && write_slave_event); - memcpy(log_name, new_log_ident,ident_len ); + if (ident_len >= sizeof(rli->master_log_name)) + return 1; + memcpy(log_name, new_log_ident,ident_len); log_name[ident_len] = 0; } - mi->pos = pos; - mi->last_log_seq = log_seq; -#ifndef DBUG_OFF - if (abort_slave_event_count) - ++events_till_abort; -#endif + rli->master_log_pos = pos; + rli->relay_log_pos += get_event_len(); if (rotate_binlog) { mysql_bin_log.new_file(); - mi->last_log_seq = 0; + rli->master_log_pos = 4; } - pthread_cond_broadcast(&mi->cond); - pthread_mutex_unlock(&mi->lock); - flush_master_info(mi); + pthread_cond_broadcast(&rli->data_cond); + pthread_mutex_unlock(&rli->data_lock); + flush_relay_log_info(rli); if (write_slave_event) { - Slave_log_event s(thd, mi); + Slave_log_event s(thd, rli); if (s.master_host) { - s.set_log_seq(0, &mysql_bin_log); + s.set_log_pos(&mysql_bin_log); s.server_id = ::server_id; mysql_bin_log.write(&s); } } - thd->log_seq = 0; return 0; } -int Intvar_log_event::exec_event(struct st_master_info* mi) +int Intvar_log_event::exec_event(struct st_relay_log_info* rli) { - switch(type) + switch (type) { case LAST_INSERT_ID_EVENT: thd->last_insert_id_used = 1; @@ -1782,18 +1798,18 @@ int Intvar_log_event::exec_event(struct st_master_info* mi) thd->next_insert_id = val; break; } - mi->inc_pending(get_event_len()); + rli->inc_pending(get_event_len()); return 0; } -int Slave_log_event::exec_event(struct st_master_info* mi) +int Slave_log_event::exec_event(struct st_relay_log_info* rli) { if(mysql_bin_log.is_open()) mysql_bin_log.write(this); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Create_file_log_event::exec_event(struct st_master_info* mi) +int Create_file_log_event::exec_event(struct st_relay_log_info* rli) { char fname_buf[FN_REFLEN+10]; char *p; @@ -1809,7 +1825,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi) init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { - slave_print_error(my_errno, "Could not open file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); goto err; } @@ -1820,7 +1836,7 @@ int Create_file_log_event::exec_event(struct st_master_info* mi) if (write_base(&file)) { strmov(p, ".info"); // to have it right in the error message - slave_print_error(my_errno, "Could not write to file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf); goto err; } end_io_cache(&file); @@ -1830,12 +1846,12 @@ int Create_file_log_event::exec_event(struct st_master_info* mi) if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, MYF(MY_WME))) < 0) { - slave_print_error(my_errno, "Could not open file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(my_errno, "Write to '%s' failed", fname_buf); + slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf); goto err; } if (mysql_bin_log.is_open()) @@ -1846,10 +1862,10 @@ err: end_io_cache(&file); if (fd >= 0) my_close(fd, MYF(0)); - return error ? 1 : Log_event::exec_event(mi); + return error ? 1 : Log_event::exec_event(rli); } -int Delete_file_log_event::exec_event(struct st_master_info* mi) +int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1860,10 +1876,10 @@ int Delete_file_log_event::exec_event(struct st_master_info* mi) (void)my_delete(fname, MYF(MY_WME)); if (mysql_bin_log.is_open()) mysql_bin_log.write(this); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Append_block_log_event::exec_event(struct st_master_info* mi) +int Append_block_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1873,12 +1889,12 @@ int Append_block_log_event::exec_event(struct st_master_info* mi) memcpy(p, ".data", 6); if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0) { - slave_print_error(my_errno, "Could not open file '%s'", fname); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(my_errno, "Write to '%s' failed", fname); + slave_print_error(rli,my_errno, "Write to '%s' failed", fname); goto err; } if (mysql_bin_log.is_open()) @@ -1887,10 +1903,10 @@ int Append_block_log_event::exec_event(struct st_master_info* mi) err: if (fd >= 0) my_close(fd, MYF(0)); - return error ? error : Log_event::exec_event(mi); + return error ? error : Log_event::exec_event(rli); } -int Execute_load_log_event::exec_event(struct st_master_info* mi) +int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1906,7 +1922,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { - slave_print_error(my_errno, "Could not open file '%s'", fname); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname); goto err; } if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, @@ -1914,7 +1930,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) (bool)0)) || lev->get_type_code() != NEW_LOAD_EVENT) { - slave_print_error(0, "File '%s' appears corrupted", fname); + slave_print_error(rli,0, "File '%s' appears corrupted", fname); goto err; } // we want to disable binary logging in slave thread @@ -1927,7 +1943,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) lev->thd = thd; if (lev->exec_event(0,0)) { - slave_print_error(my_errno, "Failed executing load from '%s'", fname); + slave_print_error(rli,my_errno, "Failed executing load from '%s'", fname); thd->options = save_options; goto err; } @@ -1943,7 +1959,7 @@ err: end_io_cache(&file); if (fd >= 0) my_close(fd, MYF(0)); - return error ? error : Log_event::exec_event(mi); + return error ? error : Log_event::exec_event(rli); } |