diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 945 |
1 files changed, 945 insertions, 0 deletions
diff --git a/sql/slave.cc b/sql/slave.cc new file mode 100644 index 00000000000..fb28922c74f --- /dev/null +++ b/sql/slave.cc @@ -0,0 +1,945 @@ +/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include "mysql_priv.h" +#include <mysql.h> +#include "mini_client.h" +#include <thr_alarm.h> +#include <my_dir.h> + +pthread_handler_decl(handle_slave,arg); +extern bool volatile abort_loop, abort_slave; + +// the master variables are defaults read from my.cnf or command line +extern uint master_port, master_connect_retry; +extern my_string master_user, master_password, master_host, + master_info_file; + +extern I_List<i_string> replicate_do_db, replicate_ignore_db; +extern I_List<THD> threads; +bool slave_running = 0; +pthread_t slave_real_id; +MASTER_INFO glob_mi; + + +extern bool opt_log_slave_updates ; + +static inline bool slave_killed(THD* thd); +static int init_slave_thread(THD* thd); +static int init_master_info(MASTER_INFO* mi); +static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); +static void safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); +static int safe_sleep(THD* thd, int sec); +static int request_table_dump(MYSQL* mysql, char* db, char* table); +static int create_table_from_dump(THD* thd, NET* net, const char* db, + const char* table_name); + +static inline bool slave_killed(THD* thd) +{ + return abort_slave || abort_loop || thd->killed; +} + +int db_ok(const char* db, I_List<i_string> &do_list, + I_List<i_string> &ignore_list ) +{ + if(do_list.is_empty() && ignore_list.is_empty()) + return 1; // ok to replicate if the user puts no constraints + + if(!db) + return 0; // if the user has specified restrictions on which databases to replicate + // and db was not selected, do not replicate + + if(!do_list.is_empty()) // if the do's are not empty + { + I_List_iterator<i_string> it(do_list); + i_string* tmp; + + while((tmp=it++)) + { + if(!strcmp(tmp->ptr, db)) + return 1; // match + } + return 0; + } + else // there are some elements in the don't, otherwise we cannot get here + { + I_List_iterator<i_string> it(ignore_list); + i_string* tmp; + + while((tmp=it++)) + { + if(!strcmp(tmp->ptr, db)) + return 0; // match + } + + return 1; + + } + + // impossible + return 0; +} + +static void init_strvar_from_file(char* var, int max_size, FILE* f, + char* default_val) +{ + + if(fgets(var, max_size, f)) + { + *(strend(var)-1) = 0; + } + else if(default_val) + strmake(var, default_val, max_size); +} + +static void init_intvar_from_file(int* var, FILE* f, + int default_val) +{ + char buf[32]; + + if(fgets(buf, sizeof(buf), f)) + { + *var = atoi(buf); + } + else if(default_val) + *var = default_val; +} + + +static int create_table_from_dump(THD* thd, NET* net, const char* db, + const char* table_name) +{ + uint packet_len = my_net_read(net); // read create table statement + TABLE_LIST tables; + int error = 0; + + if(packet_len == packet_error) + { + send_error(&thd->net, ER_MASTER_NET_READ); + return 1; + } + if(net->read_pos[0] == 255) // error from master + { + net->read_pos[packet_len] = 0; + net_printf(&thd->net, ER_MASTER, net->read_pos + 3); + return 1; + } + thd->command = COM_TABLE_DUMP; + thd->query = sql_alloc(packet_len + 1); + if(!thd->query) + { + sql_print_error("create_table_from_dump: out of memory"); + net_printf(&thd->net, ER_GET_ERRNO, "Out of memory"); + return 1; + } + memcpy(thd->query, net->read_pos, packet_len); + thd->query[packet_len] = 0; + thd->current_tablenr = 0; + thd->query_error = 0; + thd->net.no_send_ok = 1; + thd->proc_info = "Creating table from master dump"; + char* save_db = thd->db; + thd->db = thd->last_nx_db; // in case we are creating in a different + // database + mysql_parse(thd, thd->query, packet_len); // run create table + thd->db = save_db; // leave things the way the were before + + if(thd->query_error) + { + close_thread_tables(thd); // mysql_parse takes care of the error send + return 1; + } + + bzero((char*) &tables,sizeof(tables)); + tables.db = (char*)db; + tables.name = tables.real_name = (char*)table_name; + tables.lock_type = TL_WRITE; + thd->proc_info = "Opening master dump table"; + if(open_tables(thd, &tables) || !tables.table) + { + // open tables will send the error + sql_print_error("create_table_from_dump: could not open created table"); + close_thread_tables(thd); + return 1; + } + + handler *file = tables.table->file; + thd->proc_info = "Reading master dump table data"; + if(file->net_read_dump(net)) + { + net_printf(&thd->net, ER_MASTER_NET_READ); + sql_print_error("create_table_from_dump::failed in\ + handler::net_read_dump()"); + close_thread_tables(thd); + return 1; + } + + HA_CHECK_OPT check_opt; + check_opt.init(); + check_opt.quick = 1; + thd->proc_info = "rebuilding the index on master dump table"; + Vio* save_vio = thd->net.vio; + thd->net.vio = 0; // we do not want repair() to spam us with messages + // just send them to the error log, and report the failure in case of + // problems + if(file->repair(thd,&check_opt )) + { + net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name ); + error = 1; + } + thd->net.vio = save_vio; + close_thread_tables(thd); + + thd->net.no_send_ok = 0; + return error; +} + +int fetch_nx_table(THD* thd, MASTER_INFO* mi) +{ + MYSQL* mysql = mc_mysql_init(NULL); + int error = 1; + int nx_errno = 0; + if(!mysql) + { + sql_print_error("fetch_nx_table: Error in mysql_init()"); + nx_errno = ER_GET_ERRNO; + goto err; + } + + safe_connect(thd, mysql, mi); + if(slave_killed(thd)) + goto err; + + if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table)) + { + nx_errno = ER_GET_ERRNO; + sql_print_error("fetch_nx_table: failed on table dump request "); + goto err; + } + + if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db, + thd->last_nx_table)) + { + // create_table_from_dump will have sent the error alread + sql_print_error("fetch_nx_table: failed on create table "); + goto err; + } + + error = 0; + err: + if(mysql) + mc_mysql_close(mysql); + if(nx_errno && thd->net.vio) + send_error(&thd->net, nx_errno, "Error in fetch_nx_table"); + + return error; +} + +static int init_master_info(MASTER_INFO* mi) +{ + FILE* file; + MY_STAT stat_area; + char fname[FN_REFLEN]; + fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); + + if(!mi->inited) + pthread_mutex_init(&mi->lock, NULL); + + // we need a mutex while we are changing master info parameters to + // keep other threads from reading bogus info + + pthread_mutex_lock(&mi->lock); + + + if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages + // if the file does not exist + { + file = my_fopen(fname, O_CREAT|O_RDWR, MYF(MY_WME)); + if(!file) + return 1; + mi->log_file_name[0] = 0; + mi->pos = 0; + mi->file = file; + + if(master_host) + strmake(mi->host, master_host, sizeof(mi->host)); + if(master_user) + strmake(mi->user, master_user, sizeof(mi->user)); + if(master_password) + strmake(mi->password, master_password, sizeof(mi->password)); + mi->port = master_port; + mi->connect_retry = master_connect_retry; + + if(flush_master_info(mi)) + return 1; + } + else + { + file = my_fopen(fname, O_RDWR, MYF(MY_WME)); + if(!file) + return 1; + + if(!fgets(mi->log_file_name, sizeof(mi->log_file_name), file)) + { + sql_print_error("Error reading log file name from master info file "); + return 1; + } + + *(strend(mi->log_file_name) - 1) = 0; // kill \n + char buf[FN_REFLEN]; + if(!fgets(buf, sizeof(buf), file)) + { + sql_print_error("Error reading log file position from master info file"); + return 1; + } + + mi->pos = atoi(buf); + mi->file = file; + init_strvar_from_file(mi->host, sizeof(mi->host), file, master_host); + init_strvar_from_file(mi->user, sizeof(mi->user), file, master_user); + init_strvar_from_file(mi->password, sizeof(mi->password), file, + master_password); + + init_intvar_from_file((int*)&mi->port, file, master_port); + init_intvar_from_file((int*)&mi->connect_retry, file, + master_connect_retry); + + } + + mi->inited = 1; + pthread_mutex_unlock(&mi->lock); + + return 0; +} + +int show_master_info(THD* thd) +{ + DBUG_ENTER("show_master_info"); + List<Item> field_list; + field_list.push_back(new Item_empty_string("Master_Host", + sizeof(glob_mi.host))); + field_list.push_back(new Item_empty_string("Master_User", + sizeof(glob_mi.user))); + field_list.push_back(new Item_empty_string("Master_Port", 6)); + field_list.push_back(new Item_empty_string("Connect_retry", 6)); + field_list.push_back( new Item_empty_string("Log_File", + FN_REFLEN)); + field_list.push_back(new Item_empty_string("Pos", 12)); + field_list.push_back(new Item_empty_string("Slave_Running", 3)); + field_list.push_back(new Item_empty_string("Replicate_do_db", 20)); + field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20)); + if(send_fields(thd, field_list, 1)) + DBUG_RETURN(-1); + + String* packet = &thd->packet; + packet->length(0); + + pthread_mutex_lock(&glob_mi.lock); + net_store_data(packet, glob_mi.host); + net_store_data(packet, glob_mi.user); + net_store_data(packet, (uint32) glob_mi.port); + net_store_data(packet, (uint32) glob_mi.connect_retry); + net_store_data(packet, glob_mi.log_file_name); + net_store_data(packet, (longlong)glob_mi.pos); + pthread_mutex_unlock(&glob_mi.lock); + pthread_mutex_lock(&LOCK_slave); + net_store_data(packet, slave_running ? "Yes":"No"); + pthread_mutex_unlock(&LOCK_slave); + net_store_data(packet, &replicate_do_db); + net_store_data(packet, &replicate_ignore_db); + + if(my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length())) + DBUG_RETURN(-1); + + send_eof(&thd->net); + DBUG_RETURN(0); +} + +int flush_master_info(MASTER_INFO* mi) +{ + FILE* file = mi->file; + if(my_fseek(file, 0L, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR || + fprintf(file, "%s\n%ld\n%s\n%s\n%s\n%d\n%d\n", + mi->log_file_name, mi->pos, mi->host, mi->user, mi->password, + mi->port, mi->connect_retry) < 0 || + fflush(file)) + { + sql_print_error("Write error flushing master_info: %d", errno); + return 1; + } + + return 0; +} + + +static int init_slave_thread(THD* thd) +{ + DBUG_ENTER("init_slave_thread"); + thd->system_thread = thd->bootstrap = 1; + thd->client_capabilities = 0; + my_net_init(&thd->net, 0); + thd->max_packet_length=thd->net.max_packet; + thd->master_access= ~0; + thd->priv_user = 0; + thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) + | OPTION_AUTO_COMMIT | OPTION_AUTO_IS_NULL) ; + thd->system_thread = 1; + thd->client_capabilities = CLIENT_LOCAL_FILES; + slave_real_id=thd->real_id=pthread_self(); + pthread_mutex_lock(&LOCK_thread_count); + thd->thread_id = thread_id++; + pthread_mutex_unlock(&LOCK_thread_count); + + if (init_thr_lock() || + my_pthread_setspecific_ptr(THR_THD, thd) || + my_pthread_setspecific_ptr(THR_MALLOC, &thd->alloc) || + my_pthread_setspecific_ptr(THR_NET, &thd->net)) + { + close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed? + end_thread(thd,0); + DBUG_RETURN(-1); + } + + thd->mysys_var=my_thread_var; + thd->dbug_thread_id=my_thread_id(); +#ifndef __WIN__ + sigset_t set; + VOID(sigemptyset(&set)); // Get mask in use + VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals)); +#endif + + thd->alloc.free=thd->alloc.used=0; + if (thd->max_join_size == (ulong) ~0L) + thd->options |= OPTION_BIG_SELECTS; + + thd->proc_info="Waiting for master update"; + thd->version=refresh_version; + thd->set_time(); + + DBUG_RETURN(0); +} + +static int safe_sleep(THD* thd, int sec) +{ + thr_alarm_t alarmed; + thr_alarm_init(&alarmed); + time_t start_time= time((time_t*) 0); + time_t end_time= start_time+sec; + ALARM alarm_buff; + + while (start_time < end_time) + { + int nap_time = (int) (end_time - start_time); + thr_alarm(&alarmed, 2 * nap_time,&alarm_buff); // the only reason we are asking for alarm is so that + // we will be woken up in case of murder, so if we do not get killed, set the alarm + // so it goes off after we wake up naturally + sleep(nap_time); + if (thr_alarm_in_use(&alarmed)) // if we wake up before the alarm goes off, hit the button + thr_end_alarm(&alarmed); // so it will not wake up the wife and kids :-) + + if (slave_killed(thd)) + return 1; + start_time=time((time_t*) 0); + } + return 0; +} + + +static int request_dump(MYSQL* mysql, MASTER_INFO* mi) +{ + char buf[FN_REFLEN + 6]; + int len; + int binlog_flags = 0; // for now + char* logname = mi->log_file_name; + int4store(buf, mi->pos); + int2store(buf + 4, binlog_flags); + len = strlen(logname); + memcpy(buf + 6, logname,len); + if(mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 6, 1)) + // something went wrong, so we will just reconnect and retry later + // in the future, we should do a better error analysis, but for + // now we just fill up the error log :-) + { + sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs", + mc_mysql_error(mysql), master_connect_retry); + return 1; + } + + return 0; +} + +static int request_table_dump(MYSQL* mysql, char* db, char* table) +{ + char buf[1024]; + char * p = buf; + uint table_len = strlen(table); + uint db_len = strlen(db); + if(table_len + db_len > sizeof(buf) - 2) + { + sql_print_error("request_table_dump: Buffer overrun"); + return 1; + } + + *p++ = db_len; + memcpy(p, db, db_len); + p += db_len; + *p++ = table_len; + memcpy(p, table, table_len); + + if(mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1)) + { + sql_print_error("request_table_dump: Error sending the table dump \ +command"); + return 1; + } + + return 0; +} + + +static uint read_event(MYSQL* mysql, MASTER_INFO *mi) +{ + uint len = packet_error; + NET* net = &mysql->net; + int read_errno = EINTR; // for convinience lets think we start by + // being in the interrupted state :-) + // my_real_read() will time us out + // we check if we were told to die, and if not, try reading again + while (!abort_loop && !abort_slave && len == packet_error && read_errno == EINTR ) + { + len = mc_net_safe_read(mysql); + read_errno = errno; + } + if(abort_loop || abort_slave) + return packet_error; + if (len == packet_error || (int) len < 1) + { + sql_print_error("Error reading packet from server: %s (%d)", + mc_mysql_error(mysql), read_errno); + return packet_error; + } + + if(len == 1) + { + sql_print_error("Received 0 length packet from server, looks like master shutdown: %s (%d)", + mc_mysql_error(mysql), read_errno); + return packet_error; + } + + DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n", + len, net->read_pos[4])); + + return len - 1; +} + +static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) +{ + Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1 , event_len); + if(ev) + { + switch(ev->get_type_code()) + { + case QUERY_EVENT: + { + Query_log_event* qev = (Query_log_event*)ev; + int q_len = qev->q_len; + init_sql_alloc(&thd->alloc, 8192); + thd->db = (char*)qev->db; + if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + { + thd->query = (char*)qev->query; + thd->set_time((time_t)qev->when); + thd->current_tablenr = 0; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->query_id = query_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + thd->last_nx_table = thd->last_nx_db = 0; + for(;;) + { + thd->query_error = 0; // clear error + thd->last_nx_table = thd->last_nx_db = 0; + thd->net.last_errno = 0; + thd->net.last_error[0] = 0; + mysql_parse(thd, thd->query, q_len); // try query + if(!thd->query_error || slave_killed(thd)) // break if ok + break; + // if not ok + if(thd->last_nx_table && thd->last_nx_db) + { + // for now, let's just fail if the table is not + // there, and not try to be a smart alec... + + // if table was not there + //if(fetch_nx_table(thd,&glob_mi)) + // try to to fetch from master + break; // if we can't, just break + } + else + break; // if failed for some other reason, bail out + + // if fetched the table from master successfully + // we need to restore query info in thd because + // fetch_nx_table executes create table + thd->query = (char*)qev->query; + thd->set_time((time_t)qev->when); + thd->current_tablenr = 0; + } + } + thd->db = 0;// prevent db from being freed + thd->query = 0; // just to be sure + close_thread_tables(thd); + free_root(&thd->alloc); + if(thd->query_error) + { + sql_print_error("Slave: error running query '%s' ", + qev->query); + return 1; + } + delete ev; + + if(thd->fatal_error) + { + sql_print_error("Slave: Fatal error running query '%s' ", + thd->query); + return 1; + } + + mi->inc_pos(event_len); + flush_master_info(mi); + break; + } + + case LOAD_EVENT: + { + Load_log_event* lev = (Load_log_event*)ev; + init_sql_alloc(&thd->alloc, 8192); + thd->db = (char*)lev->db; + thd->query = 0; + thd->query_error = 0; + + if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + { + thd->set_time((time_t)lev->when); + thd->current_tablenr = 0; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->query_id = query_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + enum enum_duplicates handle_dup = DUP_IGNORE; + if(lev->sql_ex.opt_flags && REPLACE_FLAG) + handle_dup = DUP_REPLACE; + sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && DUMPFILE_FLAG ); + String field_term(&lev->sql_ex.field_term, 1), + enclosed(&lev->sql_ex.enclosed, 1), line_term(&lev->sql_ex.line_term,1), + escaped(&lev->sql_ex.escaped, 1), line_start(&lev->sql_ex.line_start, 1); + + ex.field_term = &field_term; + if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) + ex.field_term->length(0); + + ex.enclosed = &enclosed; + if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY) + ex.enclosed->length(0); + + ex.line_term = &line_term; + if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY) + ex.line_term->length(0); + + ex.line_start = &line_start; + if(lev->sql_ex.empty_flags & LINE_START_EMPTY) + ex.line_start->length(0); + + ex.escaped = &escaped; + if(lev->sql_ex.empty_flags & ESCAPED_EMPTY) + ex.escaped->length(0); + + ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG); + if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) + ex.field_term->length(0); + + ex.skip_lines = lev->skip_lines; + + TABLE_LIST tables; + bzero((char*) &tables,sizeof(tables)); + tables.db = thd->db; + tables.name = tables.real_name = (char*)lev->table_name; + tables.lock_type = TL_WRITE; + + if (open_tables(thd, &tables)) + { + sql_print_error("Slave: error opening table %s ", + tables.name); + delete ev; + return 1; + } + + List<Item> fields; + lev->set_fields(fields); + thd->net.vio = net->vio; + // mysql_load will use thd->net to read the file + thd->net.pkt_nr = net->pkt_nr; + // make sure the client does get confused + // about the packet sequence + if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1, + TL_WRITE)) + thd->query_error = 1; + net->pkt_nr = thd->net.pkt_nr; + } + else // we will just ask the master to send us /dev/null if we do not want to + // load the data :-) + { + (void)my_net_write(net, "\xfb/dev/null", 10); + (void)net_flush(net); + (void)my_net_read(net); // discard response + send_ok(net); // the master expects it + } + + thd->net.vio = 0; + thd->db = 0;// prevent db from being freed + close_thread_tables(thd); + if(thd->query_error) + { + int sql_error = thd->net.last_errno; + if(!sql_error) + sql_error = ER_UNKNOWN_ERROR; + + sql_print_error("Slave: error '%s' running load data infile ", + ER(sql_error)); + delete ev; + return 1; + } + delete ev; + + if(thd->fatal_error) + { + sql_print_error("Slave: Fatal error running query '%s' ", + thd->query); + return 1; + } + + mi->inc_pos(event_len); + flush_master_info(mi); + break; + } + + case START_EVENT: + mi->inc_pos(event_len); + flush_master_info(mi); + break; + + case STOP_EVENT: + mi->inc_pos(event_len); + flush_master_info(mi); + break; + case ROTATE_EVENT: + { + Rotate_log_event* rev = (Rotate_log_event*)ev; + int ident_len = rev->ident_len; + memcpy(mi->log_file_name, rev->new_log_ident,ident_len ); + mi->log_file_name[ident_len] = 0; + mi->pos = 0; + break; + } + + case INTVAR_EVENT: + { + Intvar_log_event* iev = (Intvar_log_event*)ev; + switch(iev->type) + { + case LAST_INSERT_ID_EVENT: + thd->last_insert_id_used = 1; + thd->last_insert_id = iev->val; + break; + case INSERT_ID_EVENT: + thd->next_insert_id = iev->val; + break; + + } + mi->inc_pos(event_len); + flush_master_info(mi); + break; + } + } + + } + else + { + sql_print_error("Could not parse log event entry, check the master for binlog corruption\ + This may also be a network problem, or just a bug in the master or slave code"); + return 1; + } + + + +return 0; +} + +// slave thread + +pthread_handler_decl(handle_slave,arg __attribute__((unused))) +{ + THD *thd;; // needs to be first for thread_stack + MYSQL *mysql = NULL ; + + pthread_mutex_lock(&LOCK_slave); + if(slave_running) + { + pthread_mutex_unlock(&LOCK_slave); + return 0; // safety just in case + } + slave_running = 1; + abort_slave = 0; + pthread_mutex_unlock(&LOCK_slave); + + int error = 1; + + my_thread_init(); // needs to be up here, otherwise we get a coredump + // trying to use DBUG_ stuff + thd = new THD; // note that contructor of THD uses DBUG_ ! + DBUG_ENTER("handle_slave"); + + pthread_detach_this_thread(); + if(init_slave_thread(thd) || init_master_info(&glob_mi)) + goto err; + thd->thread_stack = (char*)&thd; // remember where our stack is + + threads.append(thd); + + DBUG_PRINT("info",("master info: log_file_name=%s, position=%d", + glob_mi.log_file_name, glob_mi.pos)); + + mysql = mc_mysql_init(NULL); + if(!mysql) + { + sql_print_error("Slave thread: error in mc_mysql_init()"); + goto err; + } + + thd->proc_info = "connecting to master"; + safe_connect(thd, mysql, &glob_mi); + + while(!slave_killed(thd)) + { + thd->proc_info = "requesting binlog dump"; + if(request_dump(mysql, &glob_mi)) + { + sql_print_error("Failed on request_dump()"); + if(slave_killed(thd)) + goto err; + + thd->proc_info = "waiting to reconnect after a failed dump request"; + safe_sleep(thd, glob_mi.connect_retry); + if(slave_killed(thd)) + goto err; + + thd->proc_info = "reconnecting after a failed dump request"; + + safe_reconnect(thd, mysql, &glob_mi); + if(slave_killed(thd)) + goto err; + + continue; + } + + + while(!slave_killed(thd)) + { + bool reset = 0; + thd->proc_info = "reading master update"; + uint event_len = read_event(mysql, &glob_mi); + if(slave_killed(thd)) + goto err; + + if (event_len == packet_error) + { + thd->proc_info = "waiting to reconnect after a failed read"; + safe_sleep(thd, glob_mi.connect_retry); + if(slave_killed(thd)) + goto err; + thd->proc_info = "reconnecting after a failed read"; + safe_reconnect(thd, mysql, &glob_mi); + if(slave_killed(thd)) + goto err; + reset = 1; + } + + if(reset) + break; + + thd->proc_info = "processing master log event"; + if(exec_event(thd, &mysql->net, &glob_mi, event_len)) + { + sql_print_error("Error running query, slave aborted. Fix the problem, and re-start\ + the slave thread with mysqladmin start-slave"); + goto err; // there was an error running the query + // abort the slave thread, when the problem is fixed, the user + // should restart the slave with mysqladmin start-slave + } + + } + } + + error = 0; + err: + thd->query = thd->db = 0; // extra safety + if(mysql) + mc_mysql_close(mysql); + thd->proc_info = "waiting for slave mutex on exit"; + pthread_mutex_lock(&LOCK_slave); + slave_running = 0; + abort_slave = 0; + pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done + pthread_mutex_unlock(&LOCK_slave); + delete thd; + my_thread_end(); + pthread_exit(0); + DBUG_RETURN(0); // Can't return anything here +} + +static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) + // will try to connect until successful +{ + while(!slave_killed(thd) && + !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + sql_print_error( + "Slave thread: error connecting to slave:%s, retry in %d sec", + mc_mysql_error(mysql), mi->connect_retry); + safe_sleep(thd, mi->connect_retry); + } + +} + +// will try to connect until successful + +static void safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) +{ + while(!slave_killed(thd) && mc_mysql_reconnect(mysql)) + { + sql_print_error( + "Slave thread: error connecting to slave:%s, retry in %d sec", + mc_mysql_error(mysql), mi->connect_retry); + safe_sleep(thd, mi->connect_retry); + } + +} + +#ifdef __GNUC__ +template class I_List_iterator<i_string>; +#endif + |