summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2001-11-10 22:24:12 -0700
committerunknown <sasha@mysql.sashanet.com>2001-11-10 22:24:12 -0700
commit99c6f2feb91cf196fa7cb407d440b81039e8eac5 (patch)
tree9303005b745dbcfba11b4ee43cd2971c956dc974 /sql/sql_repl.cc
parente667649323a8b227d94d1b5efa15cc0fbaf3cb3d (diff)
downloadmariadb-git-99c6f2feb91cf196fa7cb407d440b81039e8eac5.tar.gz
work to enable reading 3.23 logs - not yet finished
moved fail-safe replication routines from sql_repl.cc to repl_failsafe.cc write start event only in the first log client/mysqlbinlog.cc: work to enable reading 3.23 logs libmysql/Makefile.shared: added mf_iocache2 to libmysqlclient - needed for mysqlbinlog mysql-test/mysql-test-run.sh: added --start-and-exit mysql-test/r/rpl000002.result: result clean-up mysql-test/r/rpl000016.result: result update mysql-test/r/rpl_log.result: result update mysql-test/t/rpl000016.test: test cleanup mysys/mf_iocache.c: fixed new bug sql/log.cc: write start event only on server start or after reset master sql/log_event.cc: work to enable reading 3.23 log format sql/log_event.h: work to enable reading 3.23 format sql/repl_failsafe.cc: code restructuring sql/repl_failsafe.h: re-organized code sql/slave.cc: check master version sql/slave.h: old_format member sql/sql_class.h: allow user to specify io cache type need_start_event member to allow writing start event only in the first log sql/sql_parse.cc: code re-organization sql/sql_repl.cc: code reorganization sql/sql_repl.h: reorganized code
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc628
1 files changed, 11 insertions, 617 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 684c084ece3..3542b288087 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -24,58 +24,15 @@
#include <thr_alarm.h>
#include <my_dir.h>
-#define SLAVE_LIST_CHUNK 128
-#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
-
extern const char* any_db;
extern pthread_handler_decl(handle_slave,arg);
-HASH slave_list;
-
#ifndef DBUG_OFF
int max_binlog_dump_events = 0; // unlimited
bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
-#ifdef SIGNAL_WITH_VIO_CLOSE
-#define KICK_SLAVE { slave_thd->close_active_vio(); \
- thr_alarm_kill(slave_real_id); }
-#else
-#define KICK_SLAVE thr_alarm_kill(slave_real_id);
-#endif
-
-static Slave_log_event* find_slave_event(IO_CACHE* log,
- const char* log_file_name,
- char* errmsg);
-
-static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
- my_bool not_used __attribute__((unused)))
-{
- *len = 4;
- return &si->server_id;
-}
-
-static void slave_info_free(void *s)
-{
- my_free((gptr) s, MYF(MY_WME));
-}
-
-void init_slave_list()
-{
- hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
- (hash_get_key) slave_list_key, slave_info_free, 0);
- pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
-}
-
-void end_slave_list()
-{
- pthread_mutex_lock(&LOCK_slave_list);
- hash_free(&slave_list);
- pthread_mutex_unlock(&LOCK_slave_list);
- pthread_mutex_destroy(&LOCK_slave_list);
-}
-
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
const char**errmsg)
{
@@ -104,69 +61,6 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
return 0;
}
-#define get_object(p, obj) \
-{\
- uint len = (uint)*p++; \
- if (p + len > p_end || len >= sizeof(obj)) \
- goto err; \
- strmake(obj,(char*) p,len); \
- p+= len; \
-}\
-
-void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
-{
- if (need_mutex)
- pthread_mutex_lock(&LOCK_slave_list);
- if (thd->server_id)
- {
- SLAVE_INFO* old_si;
- if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
- (byte*)&thd->server_id, 4)) &&
- (!only_mine || old_si->thd == thd))
- hash_delete(&slave_list, (byte*)old_si);
- }
- if (need_mutex)
- pthread_mutex_unlock(&LOCK_slave_list);
-}
-
-int register_slave(THD* thd, uchar* packet, uint packet_length)
-{
- SLAVE_INFO *si;
- int res = 1;
- uchar* p = packet, *p_end = packet + packet_length;
-
- if (check_access(thd, FILE_ACL, any_db))
- return 1;
-
- if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
- goto err;
-
- thd->server_id = si->server_id = uint4korr(p);
- p += 4;
- get_object(p,si->host);
- get_object(p,si->user);
- get_object(p,si->password);
- si->port = uint2korr(p);
- p += 2;
- si->rpl_recovery_rank = uint4korr(p);
- p += 4;
- if (!(si->master_id = uint4korr(p)))
- si->master_id = server_id;
- si->thd = thd;
- pthread_mutex_lock(&LOCK_slave_list);
-
- unregister_slave(thd,0,0);
- res = hash_insert(&slave_list, (byte*) si);
- pthread_mutex_unlock(&LOCK_slave_list);
- return res;
-
-err:
- if (si)
- my_free((gptr) si, MYF(MY_WME));
- return res;
-}
-
-
static int send_file(THD *thd)
{
NET* net = &thd->net;
@@ -252,6 +146,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
if (my_b_read(log, (byte*) magic, sizeof(magic)))
{
*errmsg = "I/O error reading the header from the binary log";
+ sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
+ log->error);
goto err;
}
if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
@@ -887,8 +783,13 @@ void reset_master()
}
LOG_INFO linfo;
+ pthread_mutex_t* log_lock = mysql_bin_log.get_log_lock();
+ pthread_mutex_lock(log_lock);
if (mysql_bin_log.find_first_log(&linfo, ""))
+ {
+ pthread_mutex_unlock(log_lock);
return;
+ }
for(;;)
{
@@ -898,7 +799,9 @@ void reset_master()
}
mysql_bin_log.close(1); // exiting close
my_delete(mysql_bin_log.get_index_fname(), MYF(MY_WME));
+ mysql_bin_log.set_need_start_event();
mysql_bin_log.open(opt_bin_logname,LOG_BIN);
+ pthread_mutex_unlock(log_lock);
}
@@ -915,242 +818,6 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
return -1;
}
-
-static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
-{
- return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
- mi->pos);
-}
-
-static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
-{
- uint32 log_seq = mi->last_log_seq;
- uint32 target_server_id = mi->server_id;
-
- for (;;)
- {
- Log_event* ev;
- if (!(ev = Log_event::read_log_event(log, 0)))
- {
- if (log->error > 0)
- strmov(errmsg, "Binary log truncated in the middle of event");
- else if (log->error < 0)
- strmov(errmsg, "I/O error reading binary log");
- else
- strmov(errmsg, "Could not find target event in the binary log");
- return 1;
- }
-
- if (ev->log_seq == log_seq && ev->server_id == target_server_id)
- {
- delete ev;
- mi->pos = my_b_tell(log);
- return 0;
- }
-
- delete ev;
- }
-}
-
-
-int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
-{
- LOG_INFO linfo;
- char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN];
- IO_CACHE log;
- File file = -1, last_file = -1;
- pthread_mutex_t *log_lock;
- const char* errmsg_p;
- Slave_log_event* sev = 0;
- my_off_t last_pos = 0;
- int error = 1;
- int cmp_res;
- LINT_INIT(cmp_res);
-
- if (!mysql_bin_log.is_open())
- {
- strmov(errmsg,"Binary log is not open");
- return 1;
- }
-
- if (!server_id_supplied)
- {
- strmov(errmsg, "Misconfigured master - server id was not set");
- return 1;
- }
-
- linfo.index_file_offset = 0;
-
-
- search_file_name[0] = 0;
-
- if (mysql_bin_log.find_first_log(&linfo, search_file_name))
- {
- strmov(errmsg,"Could not find first log");
- return 1;
- }
- thd->current_linfo = &linfo;
-
- bzero((char*) &log,sizeof(log));
- log_lock = mysql_bin_log.get_log_lock();
- pthread_mutex_lock(log_lock);
-
- for (;;)
- {
- if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
- {
- strmov(errmsg, errmsg_p);
- goto err;
- }
-
- if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
- goto err;
-
- cmp_res = cmp_master_pos(sev, mi);
- delete sev;
-
- if (!cmp_res)
- {
- /* Copy basename */
- fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
- mi->pos = my_b_tell(&log);
- goto mi_inited;
- }
- else if (cmp_res > 0)
- {
- if (!last_pos)
- {
- strmov(errmsg,
- "Slave event in first log points past the target position");
- goto err;
- }
- end_io_cache(&log);
- (void) my_close(file, MYF(MY_WME));
- if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
- MYF(MY_WME)))
- {
- errmsg[0] = 0;
- goto err;
- }
- break;
- }
-
- strmov(last_log_name, linfo.log_file_name);
- last_pos = my_b_tell(&log);
-
- switch (mysql_bin_log.find_next_log(&linfo)) {
- case LOG_INFO_EOF:
- if (last_file >= 0)
- (void)my_close(last_file, MYF(MY_WME));
- last_file = -1;
- goto found_log;
- case 0:
- break;
- default:
- strmov(errmsg, "Error reading log index");
- goto err;
- }
-
- end_io_cache(&log);
- if (last_file >= 0)
- (void) my_close(last_file, MYF(MY_WME));
- last_file = file;
- }
-
-found_log:
- my_b_seek(&log, last_pos);
- if (find_target_pos(mi,&log,errmsg))
- goto err;
- fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */
-
-mi_inited:
- error = 0;
-err:
- pthread_mutex_unlock(log_lock);
- end_io_cache(&log);
- pthread_mutex_lock(&LOCK_thread_count);
- thd->current_linfo = 0;
- pthread_mutex_unlock(&LOCK_thread_count);
- if (file >= 0)
- (void) my_close(file, MYF(MY_WME));
- if (last_file >= 0 && last_file != file)
- (void) my_close(last_file, MYF(MY_WME));
-
- return error;
-}
-
-// caller must delete result when done
-static Slave_log_event* find_slave_event(IO_CACHE* log,
- const char* log_file_name,
- char* errmsg)
-{
- Log_event* ev;
- if (!(ev = Log_event::read_log_event(log, 0)))
- {
- my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
- "Error reading start event in log '%s'",
- (char*)log_file_name);
- return 0;
- }
- delete ev;
-
- if (!(ev = Log_event::read_log_event(log, 0)))
- {
- my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
- "Error reading slave event in log '%s'",
- (char*)log_file_name);
- return 0;
- }
-
- if (ev->get_type_code() != SLAVE_EVENT)
- {
- my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
- "Second event in log '%s' is not slave event",
- (char*)log_file_name);
- delete ev;
- return 0;
- }
-
- return (Slave_log_event*)ev;
-}
-
-
-int show_new_master(THD* thd)
-{
- DBUG_ENTER("show_new_master");
- List<Item> field_list;
- char errmsg[SLAVE_ERRMSG_SIZE];
- LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
-
- errmsg[0]=0; // Safety
- if (translate_master(thd, lex_mi, errmsg))
- {
- if (errmsg[0])
- net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND,
- "SHOW NEW MASTER", errmsg);
- else
- send_error(&thd->net, 0);
-
- DBUG_RETURN(1);
- }
- else
- {
- String* packet = &thd->packet;
- field_list.push_back(new Item_empty_string("Log_name", 20));
- field_list.push_back(new Item_empty_string("Log_pos", 20));
- if (send_fields(thd, field_list, 1))
- DBUG_RETURN(-1);
- packet->length(0);
- net_store_data(packet, lex_mi->log_file_name);
- net_store_data(packet, (longlong)lex_mi->pos);
- if (my_net_write(&thd->net, packet->ptr(), packet->length()))
- DBUG_RETURN(-1);
- send_eof(&thd->net);
- DBUG_RETURN(0);
- }
-
-}
-
int show_binlog_events(THD* thd)
{
DBUG_ENTER("show_binlog_events");
@@ -1202,7 +869,8 @@ int show_binlog_events(THD* thd)
pthread_mutex_lock(mysql_bin_log.get_log_lock());
my_b_seek(&log, pos);
- for (event_count = 0; (ev = Log_event::read_log_event(&log, 0)); )
+ for (event_count = 0;
+ (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
{
if (event_count >= limit_start &&
ev->net_send(thd, linfo.log_file_name, pos))
@@ -1247,56 +915,6 @@ err:
DBUG_RETURN(0);
}
-
-int show_slave_hosts(THD* thd)
-{
- List<Item> field_list;
- NET* net = &thd->net;
- String* packet = &thd->packet;
- DBUG_ENTER("show_slave_hosts");
-
- field_list.push_back(new Item_empty_string("Server_id", 20));
- field_list.push_back(new Item_empty_string("Host", 20));
- if (opt_show_slave_auth_info)
- {
- field_list.push_back(new Item_empty_string("User",20));
- field_list.push_back(new Item_empty_string("Password",20));
- }
- field_list.push_back(new Item_empty_string("Port",20));
- field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
- field_list.push_back(new Item_empty_string("Master_id", 20));
-
- if (send_fields(thd, field_list, 1))
- DBUG_RETURN(-1);
-
- pthread_mutex_lock(&LOCK_slave_list);
-
- for (uint i = 0; i < slave_list.records; ++i)
- {
- SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
- packet->length(0);
- net_store_data(packet, si->server_id);
- net_store_data(packet, si->host);
- if (opt_show_slave_auth_info)
- {
- net_store_data(packet, si->user);
- net_store_data(packet, si->password);
- }
- net_store_data(packet, (uint32) si->port);
- net_store_data(packet, si->rpl_recovery_rank);
- net_store_data(packet, si->master_id);
- if (my_net_write(net, (char*)packet->ptr(), packet->length()))
- {
- pthread_mutex_unlock(&LOCK_slave_list);
- DBUG_RETURN(-1);
- }
- }
- pthread_mutex_unlock(&LOCK_slave_list);
- send_eof(net);
- DBUG_RETURN(0);
-}
-
-
int show_binlog_info(THD* thd)
{
DBUG_ENTER("show_binlog_info");
@@ -1402,230 +1020,6 @@ err:
return 1;
}
-
-int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
-{
- if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
- mi->port, 0, 0))
- {
- sql_print_error("Connection to master failed: %s",
- mc_mysql_error(mysql));
- return 1;
- }
- return 0;
-}
-
-
-static inline void cleanup_mysql_results(MYSQL_RES* db_res,
- MYSQL_RES** cur, MYSQL_RES** start)
-{
- for( ; cur >= start; --cur)
- {
- if (*cur)
- mc_mysql_free_result(*cur);
- }
- mc_mysql_free_result(db_res);
-}
-
-
-static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
- MYSQL_RES* table_res)
-{
- MYSQL_ROW row;
-
- for( row = mc_mysql_fetch_row(table_res); row;
- row = mc_mysql_fetch_row(table_res))
- {
- TABLE_LIST table;
- const char* table_name = row[0];
- int error;
- if (table_rules_on)
- {
- table.next = 0;
- table.db = (char*)db;
- table.real_name = (char*)table_name;
- table.updating = 1;
- if (!tables_ok(thd, &table))
- continue;
- }
-
- if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
- return error;
- }
-
- return 0;
-}
-
-
-int load_master_data(THD* thd)
-{
- MYSQL mysql;
- MYSQL_RES* master_status_res = 0;
- bool slave_was_running = 0;
- int error = 0;
-
- mc_mysql_init(&mysql);
-
- // we do not want anyone messing with the slave at all for the entire
- // duration of the data load;
- pthread_mutex_lock(&LOCK_slave);
-
- // first, kill the slave
- if ((slave_was_running = slave_running))
- {
- abort_slave = 1;
- KICK_SLAVE;
- thd->proc_info = "waiting for slave to die";
- while (slave_running)
- pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
- }
-
-
- if (connect_to_master(thd, &mysql, &glob_mi))
- {
- net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
- mc_mysql_error(&mysql));
- goto err;
- }
-
- // now that we are connected, get all database and tables in each
- {
- MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
- uint num_dbs;
-
- if (mc_mysql_query(&mysql, "show databases", 0) ||
- !(db_res = mc_mysql_store_result(&mysql)))
- {
- net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
- mc_mysql_error(&mysql));
- goto err;
- }
-
- if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
- goto err;
- // in theory, the master could have no databases at all
- // and run with skip-grant
-
- if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
- {
- net_printf(&thd->net, error = ER_OUTOFMEMORY);
- goto err;
- }
-
- // this is a temporary solution until we have online backup
- // capabilities - to be replaced once online backup is working
- // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
- // can to minimize the lock time
- if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
- mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
- !(master_status_res = mc_mysql_store_result(&mysql)))
- {
- net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
- mc_mysql_error(&mysql));
- goto err;
- }
-
- // go through every table in every database, and if the replication
- // rules allow replicating it, get it
-
- table_res_end = table_res + num_dbs;
-
- for(cur_table_res = table_res; cur_table_res < table_res_end;
- cur_table_res++)
- {
- // since we know how many rows we have, this can never be NULL
- MYSQL_ROW row = mc_mysql_fetch_row(db_res);
- char* db = row[0];
-
- /*
- Do not replicate databases excluded by rules
- also skip mysql database - in most cases the user will
- mess up and not exclude mysql database with the rules when
- he actually means to - in this case, he is up for a surprise if
- his priv tables get dropped and downloaded from master
- TO DO - add special option, not enabled
- by default, to allow inclusion of mysql database into load
- data from master
- */
-
- if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
- !strcmp(db,"mysql"))
- {
- *cur_table_res = 0;
- continue;
- }
-
- if (mysql_rm_db(thd, db, 1,1) ||
- mysql_create_db(thd, db, 0, 1))
- {
- send_error(&thd->net, 0, 0);
- cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
- goto err;
- }
-
- if (mc_mysql_select_db(&mysql, db) ||
- mc_mysql_query(&mysql, "show tables", 0) ||
- !(*cur_table_res = mc_mysql_store_result(&mysql)))
- {
- net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
- mc_mysql_error(&mysql));
- cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
- goto err;
- }
-
- if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
- {
- // we do not report the error - fetch_db_tables handles it
- cleanup_mysql_results(db_res, cur_table_res, table_res);
- goto err;
- }
- }
-
- cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
-
- // adjust position in the master
- if (master_status_res)
- {
- MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
-
- /*
- We need this check because the master may not be running with
- log-bin, but it will still allow us to do all the steps
- of LOAD DATA FROM MASTER - no reason to forbid it, really,
- although it does not make much sense for the user to do it
- */
- if (row[0] && row[1])
- {
- strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
- glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
- if (glob_mi.pos < 4)
- glob_mi.pos = 4; // don't hit the magic number
- glob_mi.pending = 0;
- flush_master_info(&glob_mi);
- }
-
- mc_mysql_free_result(master_status_res);
- }
-
- if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
- {
- net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
- mc_mysql_error(&mysql));
- goto err;
- }
- }
-
-err:
- pthread_mutex_unlock(&LOCK_slave);
- if (slave_was_running)
- start_slave(0, 0);
- mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
- if (!error)
- send_ok(&thd->net);
-
- return error;
-}
-
int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;