diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 541 |
1 files changed, 325 insertions, 216 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index e6215356ad1..be95cc16d90 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -14,8 +14,10 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - #include "mysql_priv.h" + +#ifdef HAVE_REPLICATION + #include <mysql.h> #include <myisam.h> #include "mini_client.h" @@ -24,7 +26,6 @@ #include "repl_failsafe.h" #include <thr_alarm.h> #include <my_dir.h> -#include <assert.h> bool use_slave_mask = 0; MY_BITMAP slave_error_mask; @@ -55,7 +56,6 @@ static int events_till_disconnect = -1; typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; -void skip_load_data_infile(NET* net); static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev); static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev); static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli); @@ -78,8 +78,21 @@ char* rewrite_db(char* db); /* - Get a bit mask for which threads are running so that we later can - restart these threads + Find out which replications threads are running + + SYNOPSIS + init_thread_mask() + mask Return value here + mi master_info for slave + inverse If set, returns which threads are not running + + IMPLEMENTATION + Get a bit mask for which threads are running so that we can later restart + these threads. + + RETURN + mask If inverse == 0, running threads + If inverse == 1, stopped threads */ void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse) @@ -96,6 +109,10 @@ void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse) } +/* + lock_slave_threads() +*/ + void lock_slave_threads(MASTER_INFO* mi) { //TODO: see if we can do this without dual mutex @@ -103,6 +120,11 @@ void lock_slave_threads(MASTER_INFO* mi) pthread_mutex_lock(&mi->rli.run_lock); } + +/* + unlock_slave_threads() +*/ + void unlock_slave_threads(MASTER_INFO* mi) { //TODO: see if we can do this without dual mutex @@ -111,6 +133,8 @@ void unlock_slave_threads(MASTER_INFO* mi) } +/* Initialize slave structures */ + int init_slave() { DBUG_ENTER("init_slave"); @@ -166,6 +190,7 @@ static void free_table_ent(TABLE_RULE_ENT* e) my_free((gptr) e, MYF(0)); } + static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, my_bool not_used __attribute__((unused))) { @@ -192,11 +217,7 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, - If not, open the 'log' binary file. TODO - - check proper initialization of master_log_name/master_log_pos - - We may always want to delete all logs before 'log'. - Currently if we are not calling this with 'log' as NULL or the first - log we will never delete relay logs. - If we want this we should not set skip_log_purge to 1. + - check proper initialization of group_master_log_name/group_master_log_pos RETURN VALUES 0 ok @@ -223,7 +244,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, rli->cur_log_fd = -1; } - rli->relay_log_pos = pos; + rli->group_relay_log_pos = rli->event_relay_log_pos = pos; /* Test to see if the previous run was with the skip of purging @@ -235,18 +256,15 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, goto err; } - if (log) // If not first log + if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1)) { - if (strcmp(log, rli->linfo.log_file_name)) - rli->skip_log_purge= 1; // Different name; Don't purge - if (rli->relay_log.find_log_pos(&rli->linfo, log, 1)) - { - *errmsg="Could not find target log during relay log initialization"; - goto err; - } + *errmsg="Could not find target log during relay log initialization"; + goto err; } - strmake(rli->relay_log_name,rli->linfo.log_file_name, - sizeof(rli->relay_log_name)-1); + strmake(rli->group_relay_log_name,rli->linfo.log_file_name, + sizeof(rli->group_relay_log_name)-1); + strmake(rli->event_relay_log_name,rli->linfo.log_file_name, + sizeof(rli->event_relay_log_name)-1); if (rli->relay_log.is_active(rli->linfo.log_file_name)) { /* @@ -277,7 +295,7 @@ err: If we don't purge, we can't honour relay_log_space_limit ; silently discard it */ - if (rli->skip_log_purge) + if (!relay_log_purge) rli->log_space_limit= 0; pthread_cond_broadcast(&rli->data_cond); if (need_data_lock) @@ -287,7 +305,16 @@ err: } -/* called from get_options() in mysqld.cc on start-up */ +/* + Init functio to set up array for errors that should be skipped for slave + + SYNOPSIS + init_slave_skip_errors() + arg List of errors numbers to skip, separated with ',' + + NOTES + Called from get_options() in mysqld.cc on start-up +*/ void init_slave_skip_errors(const char* arg) { @@ -298,9 +325,9 @@ void init_slave_skip_errors(const char* arg) exit(1); } use_slave_mask = 1; - for (;isspace(*arg);++arg) + for (;my_isspace(system_charset_info,*arg);++arg) /* empty */; - if (!my_casecmp(arg,"all",3)) + if (!my_strnncoll(system_charset_info,(uchar*)arg,4,(const uchar*)"all",4)) { bitmap_set_all(&slave_error_mask); return; @@ -312,15 +339,17 @@ void init_slave_skip_errors(const char* arg) break; if (err_code < MAX_SLAVE_ERROR) bitmap_set_bit(&slave_error_mask,(uint)err_code); - while (!isdigit(*p) && *p) + while (!my_isdigit(system_charset_info,*p) && *p) p++; } } /* - We assume we have a run lock on rli and that both slave thread - are not running + purge_relay_logs() + + NOTES + Assumes to have a run lock on rli and that no slave thread are running. */ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, @@ -347,9 +376,8 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, to display fine in any case. */ - rli->master_log_name[0]= 0; - rli->master_log_pos= 0; - rli->pending= 0; + rli->group_master_log_name[0]= 0; + rli->group_master_log_pos= 0; if (!rli->inited) DBUG_RETURN(0); @@ -366,16 +394,18 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, goto err; } /* Save name of used relay log file */ - strmake(rli->relay_log_name, rli->relay_log.get_log_fname(), - sizeof(rli->relay_log_name)-1); + strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(), + sizeof(rli->group_relay_log_name)-1); + strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(), + sizeof(rli->event_relay_log_name)-1); // Just first log with magic number and nothing else rli->log_space_total= BIN_LOG_HEADER_SIZE; - rli->relay_log_pos= BIN_LOG_HEADER_SIZE; + rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; rli->relay_log.reset_bytes_written(); if (!just_reset) - error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos, - 0 /* do not need data lock */, errmsg); - + error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, + 0 /* do not need data lock */, errmsg); + err: #ifndef DBUG_OFF char buf[22]; @@ -472,7 +502,8 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock, pthread_cond_t *start_cond, volatile bool *slave_running, volatile ulong *slave_run_id, - MASTER_INFO* mi) + MASTER_INFO* mi, + bool high_priority) { pthread_t th; ulong start_id; @@ -501,6 +532,8 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock, } start_id= *slave_run_id; DBUG_PRINT("info",("Creating new slave thread")); + if (high_priority) + my_pthread_attr_setprio(&connection_attrib,CONNECT_PRIOR); if (pthread_create(&th, &connection_attrib, h_func, (void*)mi)) { if (start_lock) @@ -531,9 +564,12 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock, /* - SLAVE_FORCE_ALL is not implemented here on purpose since it does not make - sense to do that for starting a slave - we always care if it actually - started the threads that were not previously running + start_slave_threads() + + NOTES + SLAVE_FORCE_ALL is not implemented here on purpose since it does not make + sense to do that for starting a slave--we always care if it actually + started the threads that were not previously running */ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, @@ -562,13 +598,13 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io, cond_io, &mi->slave_running, &mi->slave_run_id, - mi); + mi, 1); //high priority, to read the most possible if (!error && (thread_mask & SLAVE_SQL)) { error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql, cond_sql, &mi->rli.slave_running, &mi->rli.slave_run_id, - mi); + mi, 0); if (error) terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0); } @@ -578,12 +614,13 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, void init_table_rule_hash(HASH* h, bool* h_inited) { - hash_init(h, TABLE_RULE_HASH_SIZE,0,0, + hash_init(h, system_charset_info,TABLE_RULE_HASH_SIZE,0,0, (hash_get_key) get_table_key, (hash_free_key) free_table_ent, 0); *h_inited = 1; } + void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited) { my_init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE, @@ -591,6 +628,7 @@ void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited) *a_inited = 1; } + static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len) { uint i; @@ -600,8 +638,10 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len) { TABLE_RULE_ENT* e ; get_dynamic(a, (gptr)&e, i); - if (!wild_case_compare(key, key_end, (const char*)e->db, - (const char*)(e->db + e->key_len),'\\')) + if (!my_wildcmp(system_charset_info, key, key_end, + (const char*)e->db, + (const char*)(e->db + e->key_len), + '\\',wild_one,wild_many)) return e; } @@ -740,6 +780,11 @@ int add_table_rule(HASH* h, const char* table_spec) return 0; } + +/* + Add table expression with wildcards to dynamic array +*/ + int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec) { const char* dot = strchr(table_spec, '.'); @@ -756,6 +801,7 @@ int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec) return 0; } + static void free_string_array(DYNAMIC_ARRAY *a) { uint i; @@ -768,8 +814,8 @@ static void free_string_array(DYNAMIC_ARRAY *a) delete_dynamic(a); } -#ifdef NOT_USED_YET +#ifdef NOT_USED_YET static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/) { end_master_info(mi); @@ -778,6 +824,13 @@ static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/) #endif +/* + Free all resources used by slave + + SYNOPSIS + end_slave() +*/ + void end_slave() { if (active_mi) @@ -830,13 +883,25 @@ void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...) rli->last_slave_errno = err_code; } +/* + skip_load_data_infile() -void skip_load_data_infile(NET* net) + NOTES + This is used to tell a 3.23 master to break send_file() +*/ + +void skip_load_data_infile(NET *net) +{ + (void)net_request_file(net, "/dev/null"); + (void)my_net_read(net); // discard response + (void)net_write_command(net, 0, "", 0, "", 0); // Send ok +} + + +bool net_request_file(NET* net, const char* fname) { - (void)my_net_write(net, "\xfb/dev/null", 10); - (void)net_flush(net); - (void)my_net_read(net); // discard response - send_ok(net); // the master expects it + DBUG_ENTER("net_request_file"); + DBUG_RETURN(net_write_command(net, 251, fname, strlen(fname), "", 0)); } @@ -998,13 +1063,13 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, if (packet_len == packet_error) { - send_error(&thd->net, ER_MASTER_NET_READ); + send_error(thd, ER_MASTER_NET_READ); return 1; } if (net->read_pos[0] == 255) // error from master { net->read_pos[packet_len] = 0; - net_printf(&thd->net, ER_MASTER, net->read_pos + 3); + net_printf(thd, ER_MASTER, net->read_pos + 3); return 1; } thd->command = COM_TABLE_DUMP; @@ -1012,7 +1077,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, if (!(query = sql_alloc(packet_len + 1))) { sql_print_error("create_table_from_dump: out of memory"); - net_printf(&thd->net, ER_GET_ERRNO, "Out of memory"); + net_printf(thd, ER_GET_ERRNO, "Out of memory"); return 1; } memcpy(query, net->read_pos, packet_len); @@ -1050,7 +1115,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, thd->proc_info = "Opening master dump table"; if (!open_ltable(thd, &tables, TL_WRITE)) { - send_error(&thd->net,0,0); // Send error from open_ltable + send_error(thd,0,0); // Send error from open_ltable sql_print_error("create_table_from_dump: could not open created table"); goto err; } @@ -1059,7 +1124,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, thd->proc_info = "Reading master dump table data"; if (file->net_read_dump(net)) { - net_printf(&thd->net, ER_MASTER_NET_READ); + net_printf(thd, ER_MASTER_NET_READ); sql_print_error("create_table_from_dump::failed in\ handler::net_read_dump()"); goto err; @@ -1078,7 +1143,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, error=file->repair(thd,&check_opt) != 0; thd->net.vio = save_vio; if (error) - net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name); + net_printf(thd, ER_INDEX_REBUILD,tables.table->real_name); err: close_thread_tables(thd); @@ -1086,6 +1151,7 @@ err: return error; } + int fetch_master_table(THD *thd, const char *db_name, const char *table_name, MASTER_INFO *mi, MYSQL *mysql) { @@ -1100,12 +1166,12 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name, { if (!(mysql = mc_mysql_init(NULL))) { - send_error(&thd->net); // EOM + send_error(thd); // EOM DBUG_RETURN(1); } if (connect_to_master(thd, mysql, mi)) { - net_printf(&thd->net, ER_CONNECT_TO_MASTER, mc_mysql_error(mysql)); + net_printf(thd, ER_CONNECT_TO_MASTER, mc_mysql_error(mysql)); mc_mysql_close(mysql); DBUG_RETURN(1); } @@ -1129,7 +1195,7 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name, if (!called_connected) mc_mysql_close(mysql); if (errmsg && thd->net.vio) - send_error(&thd->net, error, errmsg); + send_error(thd, error, errmsg); DBUG_RETURN(test(error)); // Return 1 on error } @@ -1166,13 +1232,11 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) fn_format(fname, info_fname, mysql_data_home, "", 4+32); pthread_mutex_lock(&rli->data_lock); info_fd = rli->info_fd; - rli->pending = 0; rli->cur_log_fd = -1; rli->slave_skip_counter=0; rli->abort_pos_wait=0; - rli->skip_log_purge=0; - rli->log_space_limit = relay_log_space_limit; - rli->log_space_total = 0; + rli->log_space_limit= relay_log_space_limit; + rli->log_space_total= 0; // TODO: make this work with multi-master if (!opt_relay_logname) @@ -1213,8 +1277,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, &msg)) goto err; - rli->master_log_name[0]= 0; - rli->master_log_pos= 0; + rli->group_master_log_name[0]= 0; + rli->group_master_log_pos= 0; rli->info_fd= info_fd; } else // file exists @@ -1235,31 +1299,33 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) rli->info_fd = info_fd; int relay_log_pos, master_log_pos; - if (init_strvar_from_file(rli->relay_log_name, - sizeof(rli->relay_log_name), &rli->info_file, + if (init_strvar_from_file(rli->group_relay_log_name, + sizeof(rli->group_relay_log_name), &rli->info_file, "") || init_intvar_from_file(&relay_log_pos, &rli->info_file, BIN_LOG_HEADER_SIZE) || - init_strvar_from_file(rli->master_log_name, - sizeof(rli->master_log_name), &rli->info_file, + init_strvar_from_file(rli->group_master_log_name, + sizeof(rli->group_master_log_name), &rli->info_file, "") || init_intvar_from_file(&master_log_pos, &rli->info_file, 0)) { msg="Error reading slave log configuration"; goto err; } - rli->relay_log_pos= relay_log_pos; - rli->master_log_pos= master_log_pos; + strmake(rli->event_relay_log_name,rli->group_relay_log_name, + sizeof(rli->event_relay_log_name)-1); + rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos; + rli->group_master_log_pos= master_log_pos; if (init_relay_log_pos(rli, - rli->relay_log_name, - rli->relay_log_pos, + rli->group_relay_log_name, + rli->group_relay_log_pos, 0 /* no data lock*/, &msg)) goto err; } - DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); /* Now change the cache from READ to WRITE - must do this before flush_relay_log_info @@ -1314,9 +1380,11 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli) THD* thd = mi->io_thd; DBUG_ENTER("wait_for_relay_log_space"); + pthread_mutex_lock(&rli->log_space_lock); save_proc_info = thd->proc_info; thd->proc_info = "Waiting for relay log space to free"; + while (rli->log_space_limit < rli->log_space_total && !(slave_killed=io_slave_killed(thd,mi)) && !rli->ignore_log_space_limit) @@ -1333,7 +1401,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) { LOG_INFO linfo; DBUG_ENTER("count_relay_log_space"); - rli->log_space_total = 0; + rli->log_space_total= 0; if (rli->relay_log.find_log_pos(&linfo, NullS, 1)) { sql_print_error("Could not find first log while counting relay log space"); @@ -1468,109 +1536,115 @@ err: int register_slave_on_master(MYSQL* mysql) { - String packet; - char buf[4]; + char buf[1024], *pos= buf; + uint report_host_len, report_user_len=0, report_password_len=0; if (!report_host) return 0; - - int4store(buf, server_id); - packet.append(buf, 4); - - net_store_data(&packet, report_host); + report_host_len= strlen(report_host); if (report_user) - net_store_data(&packet, report_user); - else - packet.append((char)0); - + report_user_len= strlen(report_user); if (report_password) - net_store_data(&packet, report_user); - else - packet.append((char)0); - - int2store(buf, (uint16)report_port); - packet.append(buf, 2); - int4store(buf, rpl_recovery_rank); - packet.append(buf, 4); - int4store(buf, 0); /* tell the master will fill in master_id */ - packet.append(buf, 4); - - if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(), - packet.length(), 0)) + report_password_len= strlen(report_password); + /* 30 is a good safety margin */ + if (report_host_len + report_user_len + report_password_len + 30 > + sizeof(buf)) + return 0; // safety + + int4store(pos, server_id); pos+= 4; + pos= net_store_data(pos, report_host, report_host_len); + pos= net_store_data(pos, report_user, report_user_len); + pos= net_store_data(pos, report_password, report_password_len); + int2store(pos, (uint16) report_port); pos+= 2; + int4store(pos, rpl_recovery_rank); pos+= 4; + /* The master will fill in master_id */ + int4store(pos, 0); pos+= 4; + + if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*) buf, + (uint) (pos- buf), 0)) { sql_print_error("Error on COM_REGISTER_SLAVE: %d '%s'", mc_mysql_errno(mysql), mc_mysql_error(mysql)); return 1; } - return 0; } + int show_master_info(THD* thd, MASTER_INFO* mi) { // TODO: fix this for multi-master - DBUG_ENTER("show_master_info"); List<Item> field_list; + Protocol *protocol= thd->protocol; + DBUG_ENTER("show_master_info"); + field_list.push_back(new Item_empty_string("Master_Host", sizeof(mi->host))); field_list.push_back(new Item_empty_string("Master_User", sizeof(mi->user))); - field_list.push_back(new Item_empty_string("Master_Port", 6)); - field_list.push_back(new Item_empty_string("Connect_retry", 6)); + field_list.push_back(new Item_return_int("Master_Port", 7, + MYSQL_TYPE_LONG)); + field_list.push_back(new Item_return_int("Connect_retry", 10, + MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Master_Log_File", - FN_REFLEN)); - field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12)); + FN_REFLEN)); + field_list.push_back(new Item_return_int("Read_Master_Log_Pos", 10, + MYSQL_TYPE_LONGLONG)); field_list.push_back(new Item_empty_string("Relay_Log_File", - FN_REFLEN)); - field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12)); + FN_REFLEN)); + field_list.push_back(new Item_return_int("Relay_Log_Pos", 10, + MYSQL_TYPE_LONGLONG)); field_list.push_back(new Item_empty_string("Relay_Master_Log_File", - FN_REFLEN)); + FN_REFLEN)); field_list.push_back(new Item_empty_string("Slave_IO_Running", 3)); field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3)); field_list.push_back(new Item_empty_string("Replicate_do_db", 20)); field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20)); - field_list.push_back(new Item_empty_string("Last_errno", 4)); + field_list.push_back(new Item_return_int("Last_errno", 4, MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Last_error", 20)); - field_list.push_back(new Item_empty_string("Skip_counter", 12)); - field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12)); - field_list.push_back(new Item_empty_string("Relay_log_space", 12)); - if (send_fields(thd, field_list, 1)) + field_list.push_back(new Item_return_int("Skip_counter", 10, + MYSQL_TYPE_LONG)); + field_list.push_back(new Item_return_int("Exec_master_log_pos", 10, + MYSQL_TYPE_LONGLONG)); + field_list.push_back(new Item_return_int("Relay_log_space", 10, + MYSQL_TYPE_LONGLONG)); + if (protocol->send_fields(&field_list, 1)) DBUG_RETURN(-1); if (mi->host[0]) { String *packet= &thd->packet; - packet->length(0); + protocol->prepare_for_resend(); pthread_mutex_lock(&mi->data_lock); pthread_mutex_lock(&mi->rli.data_lock); - net_store_data(packet, mi->host); - net_store_data(packet, mi->user); - net_store_data(packet, (uint32) mi->port); - net_store_data(packet, (uint32) mi->connect_retry); - net_store_data(packet, mi->master_log_name); - net_store_data(packet, (longlong) mi->master_log_pos); - net_store_data(packet, mi->rli.relay_log_name + - dirname_length(mi->rli.relay_log_name)); - net_store_data(packet, (longlong) mi->rli.relay_log_pos); - net_store_data(packet, mi->rli.master_log_name); - net_store_data(packet, mi->slave_running ? "Yes":"No"); - net_store_data(packet, mi->rli.slave_running ? "Yes":"No"); - net_store_data(packet, &replicate_do_db); - net_store_data(packet, &replicate_ignore_db); - net_store_data(packet, (uint32)mi->rli.last_slave_errno); - net_store_data(packet, mi->rli.last_slave_error); - net_store_data(packet, mi->rli.slave_skip_counter); - net_store_data(packet, (longlong) mi->rli.master_log_pos); - net_store_data(packet, (longlong) mi->rli.log_space_total); + protocol->store(mi->host, &my_charset_bin); + protocol->store(mi->user, &my_charset_bin); + protocol->store((uint32) mi->port); + protocol->store((uint32) mi->connect_retry); + protocol->store(mi->master_log_name, &my_charset_bin); + protocol->store((ulonglong) mi->master_log_pos); + protocol->store(mi->rli.group_relay_log_name + + dirname_length(mi->rli.group_relay_log_name), &my_charset_bin); + protocol->store((ulonglong) mi->rli.group_relay_log_pos); + protocol->store(mi->rli.group_master_log_name, &my_charset_bin); + protocol->store(mi->slave_running ? "Yes":"No", &my_charset_bin); + protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin); + protocol->store(&replicate_do_db); + protocol->store(&replicate_ignore_db); + protocol->store((uint32) mi->rli.last_slave_errno); + protocol->store(mi->rli.last_slave_error, &my_charset_bin); + protocol->store((uint32) mi->rli.slave_skip_counter); + protocol->store((ulonglong) mi->rli.group_master_log_pos); + protocol->store((ulonglong) mi->rli.log_space_total); pthread_mutex_unlock(&mi->rli.data_lock); pthread_mutex_unlock(&mi->data_lock); if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length())) DBUG_RETURN(-1); } - send_eof(&thd->net); + send_eof(thd); DBUG_RETURN(0); } @@ -1586,25 +1660,22 @@ bool flush_master_info(MASTER_INFO* mi) my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n", mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, - mi->password, mi->port, mi->connect_retry - ); + mi->password, mi->port, mi->connect_retry); flush_io_cache(file); DBUG_RETURN(0); } st_relay_log_info::st_relay_log_info() - :info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0), - cur_log_old_open_count(0), log_space_total(0), ignore_log_space_limit(0), - slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), - sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), - slave_running(0), skip_log_purge(0), - inside_transaction(0) /* the default is autocommit=1 */ -{ - relay_log_name[0] = master_log_name[0] = 0; + :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0), + cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0), + ignore_log_space_limit(0), slave_skip_counter(0), abort_pos_wait(0), + slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), + slave_running(0) +{ + group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0; last_slave_error[0]=0; - bzero(&info_file,sizeof(info_file)); bzero(&cache_buf, sizeof(cache_buf)); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); @@ -1666,8 +1737,8 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, set_timespec(abstime,timeout); DBUG_ENTER("wait_for_pos"); - DBUG_PRINT("enter",("master_log_name: '%s' pos: %lu timeout: %ld", - master_log_name, (ulong) master_log_pos, + DBUG_PRINT("enter",("group_master_log_name: '%s' pos: %lu timeout: %ld", + group_master_log_name, (ulong) group_master_log_pos, (long) timeout)); pthread_mutex_lock(&data_lock); @@ -1717,10 +1788,10 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, { bool pos_reached; int cmp_result= 0; - DBUG_ASSERT(*master_log_name || master_log_pos == 0); - if (*master_log_name) + DBUG_ASSERT(*group_master_log_name || group_master_log_pos == 0); + if (*group_master_log_name) { - char *basename= master_log_name + dirname_length(master_log_name); + char *basename= group_master_log_name + dirname_length(group_master_log_name); /* First compare the parts before the extension. Find the dot in the master's log basename, @@ -1735,13 +1806,13 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, } // Now compare extensions. char *q_end; - ulong master_log_name_extension= strtoul(q, &q_end, 10); - if (master_log_name_extension < log_name_extension) + ulong group_master_log_name_extension= strtoul(q, &q_end, 10); + if (group_master_log_name_extension < log_name_extension) cmp_result = -1 ; else - cmp_result= (master_log_name_extension > log_name_extension) ? 1 : 0 ; + cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ; } - pos_reached = ((!cmp_result && master_log_pos >= (ulonglong)log_pos) || + pos_reached = ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) || cmp_result > 0); if (pos_reached || thd->killed) break; @@ -1796,6 +1867,10 @@ improper_arguments: %d timed_out: %d", } +/* + init_slave_thread() +*/ + static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) { DBUG_ENTER("init_slave_thread"); @@ -1934,7 +2009,7 @@ command"); /* - read one event from the master + Read one event from the master SYNOPSIS read_event() @@ -1948,7 +2023,6 @@ command"); RETURN VALUES 'packet_error' Error number Length of packet - */ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings) @@ -2024,7 +2098,6 @@ point. If you are sure that your master is ok, run this query manually on the\ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) { - DBUG_ASSERT(rli->sql_thd==thd); Log_event * ev = next_event(rli); DBUG_ASSERT(rli->sql_thd==thd); if (sql_slave_killed(thd,rli)) @@ -2046,7 +2119,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) (rli->slave_skip_counter && type_code != ROTATE_EVENT)) { /* TODO: I/O thread should not even log events with the same server id */ - rli->inc_pos(ev->get_event_len(), + rli->inc_group_relay_log_pos(ev->get_event_len(), type_code != STOP_EVENT ? ev->log_pos : LL(0), 1/* skip lock*/); flush_relay_log_info(rli); @@ -2087,7 +2160,8 @@ This may also be a network problem, or just a bug in the master or slave code.\ } -/* slave I/O thread */ +/* Slave I/O Thread entry point */ + extern "C" pthread_handler_decl(handle_slave_io,arg) { THD *thd; // needs to be first for thread_stack @@ -2360,7 +2434,7 @@ err: } -/* slave SQL logic thread */ +/* Slave SQL Thread entry point */ extern "C" pthread_handler_decl(handle_slave_sql,arg) { @@ -2386,7 +2460,8 @@ slave_begin: #endif thd = new THD; // note that contructor of THD uses DBUG_ ! - THD_CHECK_SENTRY(thd); + thd->thread_stack = (char*)&thd; // remember where our stack is + /* Inform waiting threads that slave has started */ rli->slave_run_id++; @@ -2402,9 +2477,9 @@ slave_begin: sql_print_error("Failed during slave thread initialization"); goto err; } + thd->init_for_queries(); rli->sql_thd= thd; thd->temporary_tables = rli->save_temporary_tables; // restore temp tables - thd->thread_stack = (char*)&thd; // remember where our stack is pthread_mutex_lock(&LOCK_thread_count); threads.append(thd); pthread_mutex_unlock(&LOCK_thread_count); @@ -2412,15 +2487,13 @@ slave_begin: rli->abort_slave = 0; pthread_mutex_unlock(&rli->run_lock); pthread_cond_broadcast(&rli->start_cond); - // This should always be set to 0 when the slave thread is started - rli->pending = 0; //tell the I/O thread to take relay_log_space_limit into account from now on rli->ignore_log_space_limit= 0; if (init_relay_log_pos(rli, - rli->relay_log_name, - rli->relay_log_pos, + rli->group_relay_log_name, + rli->group_relay_log_pos, 1 /*need data lock*/, &errmsg)) { sql_print_error("Error initializing relay log position: %s", @@ -2428,18 +2501,18 @@ slave_begin: goto err; } THD_CHECK_SENTRY(thd); - DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); DBUG_ASSERT(rli->sql_thd == thd); DBUG_PRINT("master_info",("log_file_name: %s position: %s", - rli->master_log_name, - llstr(rli->master_log_pos,llbuff))); + rli->group_master_log_name, + llstr(rli->group_master_log_pos,llbuff))); if (global_system_variables.log_warnings) sql_print_error("Slave SQL thread initialized, starting replication in \ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, - llstr(rli->master_log_pos,llbuff),rli->relay_log_name, - llstr(rli->relay_log_pos,llbuff1)); + llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name, + llstr(rli->group_relay_log_pos,llbuff1)); /* Read queries from the IO/THREAD until this thread is killed */ @@ -2456,7 +2529,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, Error running query, slave SQL thread aborted. Fix the problem, and restart \ the slave SQL thread with \"SLAVE START\". We stopped at log \ '%s' position %s", - RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); + RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff)); goto err; } } @@ -2464,7 +2537,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ /* Thread stopped. Print the current replication position to the log */ sql_print_error("Slave SQL thread exiting, replication stopped in log \ '%s' at position %s", - RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff)); + RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); err: VOID(pthread_mutex_lock(&LOCK_thread_count)); @@ -2497,19 +2570,23 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ if (abort_slave_event_count && !rli->events_till_abort) goto slave_begin; #endif - my_thread_end(); // clean-up before broadcasting termination + my_thread_end(); pthread_exit(0); DBUG_RETURN(0); // Can't return anything here } +/* + process_io_create_file() +*/ + static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) { int error = 1; ulong num_bytes; bool cev_not_written; - THD* thd; - NET* net = &mi->mysql->net; + THD *thd = mi->io_thd; + NET *net = &mi->mysql->net; DBUG_ENTER("process_io_create_file"); if (unlikely(!cev->is_valid())) @@ -2523,7 +2600,6 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) DBUG_RETURN(0); } DBUG_ASSERT(cev->inited_from_old); - thd = mi->io_thd; thd->file_id = cev->file_id = mi->file_id++; thd->server_id = cev->server_id; cev_not_written = 1; @@ -2553,7 +2629,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) } if (unlikely(!num_bytes)) /* eof */ { - send_ok(net); /* 3.23 master wants it */ + net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */ Execute_load_log_event xev(thd,0); xev.log_pos = mi->master_log_pos; if (unlikely(mi->rli.relay_log.append(&xev))) @@ -2599,6 +2675,7 @@ err: DBUG_RETURN(error); } + /* Start using a new binary log on the master @@ -2608,7 +2685,7 @@ err: rev The rotate log event read from the binary log DESCRIPTION - Updates the master info and relay data with the place in the next binary + Updates the master info with the place in the next binary log where we should start reading. NOTES @@ -2617,6 +2694,7 @@ err: RETURN VALUES 0 ok 1 Log event is illegal + */ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev) @@ -2643,7 +2721,10 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev) DBUG_RETURN(0); } + /* + queue_old_event() + TODO: Test this code before release - it has to be tested on a separate setup with 3.23 master @@ -2736,7 +2817,10 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, DBUG_RETURN(0); } + /* + queue_event() + TODO: verify the issue with stop events, see if we need them at all in the relay log */ @@ -2817,7 +2901,20 @@ void end_relay_log_info(RELAY_LOG_INFO* rli) DBUG_VOID_RETURN; } -/* try to connect until successful or slave killed */ +/* + Try to connect until successful or slave killed + + SYNPOSIS + safe_connect() + thd Thread handler for slave + mysql MySQL connection handle + mi Replication handle + + RETURN + 0 ok + # Error +*/ + static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) { return connect_to_master(thd, mysql, mi, 0, 0); @@ -2825,8 +2922,12 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) /* - Try to connect until successful or slave killed or we have retried - master_retry_count times + SYNPOSIS + connect_to_master() + + IMPLEMENTATION + Try to connect until successful or slave killed or we have retried + master_retry_count times */ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, @@ -2909,8 +3010,11 @@ replication resumed in log '%s' at position %s", mi->user, /* - Try to connect until successful or slave killed or we have retried - master_retry_count times + safe_reconnect() + + IMPLEMENTATION + Try to connect until successful or slave killed or we have retried + master_retry_count times */ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi, @@ -2955,18 +3059,14 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli) IO_CACHE *file = &rli->info_file; char buff[FN_REFLEN*2+22*2+4], *pos; - /* sql_thd is not set when calling from init_slave() */ - if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN)) - return 0; // Wait for COMMIT - my_b_seek(file, 0L); - pos=strmov(buff, rli->relay_log_name); + pos=strmov(buff, rli->group_relay_log_name); *pos++='\n'; - pos=longlong2str(rli->relay_log_pos, pos, 10); + pos=longlong2str(rli->group_relay_log_pos, pos, 10); *pos++='\n'; - pos=strmov(pos, rli->master_log_name); + pos=strmov(pos, rli->group_master_log_name); *pos++='\n'; - pos=longlong2str(rli->master_log_pos, pos, 10); + pos=longlong2str(rli->group_master_log_pos, pos, 10); *pos='\n'; if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1)) error=1; @@ -2979,8 +3079,7 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli) /* - This function is called when we notice that the current "hot" log - got rotated under our feet. + Called when we notice that the current "hot" log got rotated under our feet. */ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) @@ -2990,7 +3089,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) DBUG_ENTER("reopen_relay_log"); IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf; - if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name, + if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name, errmsg)) <0) DBUG_RETURN(0); /* @@ -2998,7 +3097,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) relay_log_pos Current log pos pending Number of bytes already processed from the event */ - my_b_seek(cur_log,rli->relay_log_pos + rli->pending); + my_b_seek(cur_log,rli->event_relay_log_pos); DBUG_RETURN(cur_log); } @@ -3007,7 +3106,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) { Log_event* ev; IO_CACHE* cur_log = rli->cur_log; - pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); + pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); const char* errmsg=0; THD* thd = rli->sql_thd; DBUG_ENTER("next_event"); @@ -3056,7 +3155,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) } } DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); + DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); /* Relay log is always in new format - if the master is 3.23, the I/O thread will convert the format for us @@ -3123,8 +3222,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli) // prevent the I/O thread from blocking next times rli->ignore_log_space_limit= 1; // If the I/O thread is blocked, unblock it - pthread_cond_broadcast(&rli->log_space_cond); pthread_mutex_unlock(&rli->log_space_lock); + pthread_cond_broadcast(&rli->log_space_cond); // Note that wait_for_update unlocks lock_log ! rli->relay_log.wait_for_update(rli->sql_thd); // re-acquire data lock since we released it earlier @@ -3141,16 +3240,25 @@ Log_event* next_event(RELAY_LOG_INFO* rli) my_close(rli->cur_log_fd, MYF(MY_WME)); rli->cur_log_fd = -1; - /* - TODO: make skip_log_purge a start-up option. At this point this - is not critical priority - */ - if (!rli->skip_log_purge) + if (relay_log_purge) { - // purge_first_log will properly set up relay log coordinates in rli - if (rli->relay_log.purge_first_log(rli)) + /* + purge_first_log will properly set up relay log coordinates in rli. + If the group's coordinates are equal to the event's coordinates + (i.e. the relay log was not rotated in the middle of a group), + we can purge this relay log too. + We do ulonglong and string comparisons, this may be slow but + - purging the last relay log is nice (it can save 1GB of disk), so we + like to detect the case where we can do it, and given this, + - I see no better detection method + - purge_first_log is not called that often + */ + if (rli->relay_log.purge_first_log + (rli, + rli->group_relay_log_pos == rli->event_relay_log_pos + && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name))) { - errmsg = "Error purging processed log"; + errmsg = "Error purging processed logs"; goto err; } } @@ -3168,10 +3276,9 @@ Log_event* next_event(RELAY_LOG_INFO* rli) errmsg = "error switching to the next log"; goto err; } - rli->relay_log_pos = BIN_LOG_HEADER_SIZE; - rli->pending=0; - strmake(rli->relay_log_name,rli->linfo.log_file_name, - sizeof(rli->relay_log_name)-1); + rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE; + strmake(rli->event_relay_log_name,rli->linfo.log_file_name, + sizeof(rli->event_relay_log_name)-1); flush_relay_log_info(rli); } @@ -3219,7 +3326,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) event(errno: %d cur_log->error: %d)", my_errno,cur_log->error); // set read position to the beginning of the event - my_b_seek(cur_log,rli->relay_log_pos+rli->pending); + my_b_seek(cur_log,rli->event_relay_log_pos); /* otherwise, we have had a partial read */ errmsg = "Aborting slave SQL thread because of partial event read"; break; // To end of function @@ -3240,3 +3347,5 @@ err: template class I_List_iterator<i_string>; template class I_List_iterator<i_string_pair>; #endif + +#endif /* HAVE_REPLICATION */ |