diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 217 |
1 files changed, 192 insertions, 25 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 9cc0ba7c29a..5d9d30b6020 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -16,8 +16,10 @@ #include "mysql_priv.h" #ifdef HAVE_REPLICATION +#include "rpl_mi.h" #include "sql_repl.h" #include "log_event.h" +#include "rpl_filter.h" #include <my_dir.h> int max_binlog_dump_events = 0; // unlimited @@ -70,7 +72,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, int8store(buf+R_POS_OFFSET,position); packet->append(buf, ROTATE_HEADER_LEN); packet->append(p,ident_len); - if (my_net_write(net, (char*)packet->ptr(), packet->length())) + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { *errmsg = "failed on my_net_write()"; DBUG_RETURN(-1); @@ -81,12 +83,13 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, static int send_file(THD *thd) { NET* net = &thd->net; - int fd = -1,bytes, error = 1; + int fd = -1, error = 1; + size_t bytes; char fname[FN_REFLEN+1]; const char *errmsg = 0; int old_timeout; unsigned long packet_len; - char buf[IO_SIZE]; // It's safe to alloc this + uchar buf[IO_SIZE]; // It's safe to alloc this DBUG_ENTER("send_file"); /* @@ -119,7 +122,7 @@ static int send_file(THD *thd) goto err; } - while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE, MYF(0))) > 0) + while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0) { if (my_net_write(net, buf, bytes)) { @@ -129,7 +132,7 @@ static int send_file(THD *thd) } end: - if (my_net_write(net, "", 0) || net_flush(net) || + if (my_net_write(net, (uchar*) "", 0) || net_flush(net) || (my_net_read(net) == packet_error)) { errmsg = "while negotiating file transfer close"; @@ -215,7 +218,8 @@ bool log_in_use(const char* log_name) if ((linfo = tmp->current_linfo)) { pthread_mutex_lock(&linfo->lock); - result = !bcmp(log_name, linfo->log_file_name, log_name_len); + result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name, + log_name_len); pthread_mutex_unlock(&linfo->lock); if (result) break; @@ -239,6 +243,7 @@ bool purge_error_message(THD* thd, int res) case LOG_INFO_MEM: errmsg= ER_OUT_OF_RESOURCES; break; case LOG_INFO_FATAL: errmsg= ER_BINLOG_PURGE_FATAL_ERR; break; case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break; + case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break; default: errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break; } @@ -477,7 +482,7 @@ impossible position"; int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ ST_CREATED_OFFSET+1, (ulong) 0); /* send it */ - if (my_net_write(net, (char*)packet->ptr(), packet->length())) + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; @@ -535,7 +540,7 @@ impossible position"; else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) binlog_can_be_corrupted= FALSE; - if (my_net_write(net, (char*)packet->ptr(), packet->length())) + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; @@ -648,7 +653,7 @@ impossible position"; if (read_packet) { thd->proc_info = "Sending binlog event to slave"; - if (my_net_write(net, (char*)packet->ptr(), packet->length()) ) + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; @@ -811,7 +816,7 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report) sizeof(mi->rli.until_log_name)-1); } else - clear_until_condition(&mi->rli); + mi->rli.clear_until_condition(); if (mi->rli.until_condition != RELAY_LOG_INFO::UNTIL_NONE) { @@ -965,6 +970,9 @@ int reset_slave(THD *thd, MASTER_INFO* mi) error=1; goto err; } + + ha_reset_slave(thd); + // delete relay logs, clear relay log coordinates if ((error= purge_relay_logs(&mi->rli, thd, 1 /* just reset */, @@ -983,8 +991,8 @@ int reset_slave(THD *thd, MASTER_INFO* mi) Reset errors (the idea is that we forget about the old master). */ - clear_slave_error(&mi->rli); - clear_until_condition(&mi->rli); + mi->rli.clear_error(); + mi->rli.clear_until_condition(); // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 end_master_info(mi); @@ -1125,6 +1133,11 @@ bool change_master(THD* thd, MASTER_INFO* mi) if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED) mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE); + + if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED) + mi->ssl_verify_server_cert= + (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE); + if (lex_mi->ssl_ca) strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1); if (lex_mi->ssl_capath) @@ -1137,7 +1150,8 @@ bool change_master(THD* thd, MASTER_INFO* mi) strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1); #ifndef HAVE_OPENSSL if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath || - lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ) + lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key || + lex_mi->ssl_verify_server_cert ) push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS)); #endif @@ -1249,8 +1263,8 @@ bool change_master(THD* thd, MASTER_INFO* mi) pthread_mutex_lock(&mi->rli.data_lock); mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */ /* Clear the errors, for a clean start */ - clear_slave_error(&mi->rli); - clear_until_condition(&mi->rli); + mi->rli.clear_error(); + mi->rli.clear_until_condition(); /* If we don't write new coordinates to disk now, then old will remain in relay-log.info until START SLAVE is issued; but if mysqld is shutdown @@ -1300,12 +1314,12 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, bool mysql_show_binlog_events(THD* thd) { Protocol *protocol= thd->protocol; - DBUG_ENTER("mysql_show_binlog_events"); List<Item> field_list; const char *errmsg = 0; bool ret = TRUE; IO_CACHE log; File file = -1; + DBUG_ENTER("mysql_show_binlog_events"); Log_event::init_show_field_list(&field_list); if (protocol->send_fields(&field_list, @@ -1315,6 +1329,13 @@ bool mysql_show_binlog_events(THD* thd) Format_description_log_event *description_event= new Format_description_log_event(3); /* MySQL 4.0 by default */ + /* + Wait for handlers to insert any pending information + into the binlog. For e.g. ndb which updates the binlog asynchronously + this is needed so that the uses sees all its own commands in the binlog + */ + ha_binlog_wait(thd); + if (mysql_bin_log.is_open()) { LEX_MASTER_INFO *lex_mi= &thd->lex->mi; @@ -1352,12 +1373,12 @@ bool mysql_show_binlog_events(THD* thd) pthread_mutex_lock(log_lock); /* - open_binlog() sought to position 4. - Read the first event in case it's a Format_description_log_event, to - know the format. If there's no such event, we are 3.23 or 4.x. This - code, like before, can't read 3.23 binlogs. - This code will fail on a mixed relay log (one which has Format_desc then - Rotate then Format_desc). + open_binlog() sought to position 4. + Read the first event in case it's a Format_description_log_event, to + know the format. If there's no such event, we are 3.23 or 4.x. This + code, like before, can't read 3.23 binlogs. + This code will fail on a mixed relay log (one which has Format_desc then + Rotate then Format_desc). */ ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event); @@ -1381,7 +1402,8 @@ bool mysql_show_binlog_events(THD* thd) } for (event_count = 0; - (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event)); ) + (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0, + description_event)); ) { if (event_count >= limit_start && ev->net_send(protocol, linfo.log_file_name, pos)) @@ -1455,8 +1477,8 @@ bool show_binlog_info(THD* thd) int dir_len = dirname_length(li.log_file_name); protocol->store(li.log_file_name + dir_len, &my_charset_bin); protocol->store((ulonglong) li.pos); - protocol->store(&binlog_do_db); - protocol->store(&binlog_ignore_db); + protocol->store(binlog_filter->get_do_db()); + protocol->store(binlog_filter->get_ignore_db()); if (protocol->write()) DBUG_RETURN(TRUE); } @@ -1561,6 +1583,8 @@ int log_loaded_block(IO_CACHE* file) if (!(block_len = (char*) file->read_end - (char*) buffer)) return 0; lf_info = (LOAD_FILE_INFO*) file->arg; + if (lf_info->thd->current_stmt_binlog_row_based) + return 0; if (lf_info->last_pos_in_file != HA_POS_ERROR && lf_info->last_pos_in_file >= file->pos_in_file) return 0; @@ -1583,6 +1607,149 @@ int log_loaded_block(IO_CACHE* file) return 0; } +/* + Replication System Variables +*/ + +class sys_var_slave_skip_counter :public sys_var +{ +public: + sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg) + :sys_var(name_arg) + { chain_sys_var(chain); } + bool check(THD *thd, set_var *var); + bool update(THD *thd, set_var *var); + bool check_type(enum_var_type type) { return type != OPT_GLOBAL; } + /* + We can't retrieve the value of this, so we don't have to define + type() or value_ptr() + */ +}; + +class sys_var_sync_binlog_period :public sys_var_long_ptr +{ +public: + sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg, + ulong *value_ptr) + :sys_var_long_ptr(chain, name_arg,value_ptr) {} + bool update(THD *thd, set_var *var); +}; + +static sys_var_chain vars = { NULL, NULL }; + +static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge", + &relay_log_purge); +static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout", + &slave_net_timeout); +static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries", + &slave_trans_retries); +static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period); +static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter"); + +static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff); + + +static SHOW_VAR fixed_vars[]= { + {"log_slave_updates", (char*) &opt_log_slave_updates, SHOW_MY_BOOL}, + {"relay_log_space_limit", (char*) &relay_log_space_limit, SHOW_LONGLONG}, + {"slave_load_tmpdir", (char*) &slave_load_tmpdir, SHOW_CHAR_PTR}, + {"slave_skip_errors", (char*) &show_slave_skip_errors, SHOW_FUNC}, +}; + +static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff) +{ + var->type=SHOW_CHAR; + var->value= buff; + if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask)) + { + var->value= const_cast<char *>("OFF"); + } + else if (bitmap_is_set_all(&slave_error_mask)) + { + var->value= const_cast<char *>("ALL"); + } + else + { + /* 10 is enough assuming errors are max 4 digits */ + int i; + var->value= buff; + for (i= 1; + i < MAX_SLAVE_ERROR && + (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE; + i++) + { + if (bitmap_is_set(&slave_error_mask, i)) + { + buff= int10_to_str(i, buff, 10); + *buff++= ','; + } + } + if (var->value != buff) + buff--; // Remove last ',' + if (i < MAX_SLAVE_ERROR) + buff= strmov(buff, "..."); // Couldn't show all errors + *buff=0; + } + return 0; +} + +bool sys_var_slave_skip_counter::check(THD *thd, set_var *var) +{ + int result= 0; + pthread_mutex_lock(&LOCK_active_mi); + pthread_mutex_lock(&active_mi->rli.run_lock); + if (active_mi->rli.slave_running) + { + my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0)); + result=1; + } + pthread_mutex_unlock(&active_mi->rli.run_lock); + pthread_mutex_unlock(&LOCK_active_mi); + var->save_result.ulong_value= (ulong) var->value->val_int(); + return result; +} + + +bool sys_var_slave_skip_counter::update(THD *thd, set_var *var) +{ + pthread_mutex_lock(&LOCK_active_mi); + pthread_mutex_lock(&active_mi->rli.run_lock); + /* + The following test should normally never be true as we test this + in the check function; To be safe against multiple + SQL_SLAVE_SKIP_COUNTER request, we do the check anyway + */ + if (!active_mi->rli.slave_running) + { + pthread_mutex_lock(&active_mi->rli.data_lock); + active_mi->rli.slave_skip_counter= var->save_result.ulong_value; + pthread_mutex_unlock(&active_mi->rli.data_lock); + } + pthread_mutex_unlock(&active_mi->rli.run_lock); + pthread_mutex_unlock(&LOCK_active_mi); + return 0; +} + + +bool sys_var_sync_binlog_period::update(THD *thd, set_var *var) +{ + sync_binlog_period= (ulong) var->save_result.ulonglong_value; + return 0; +} + +int init_replication_sys_vars() +{ + mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR)); + + if (mysql_add_sys_var_chain(vars.first, my_long_options)) + { + /* should not happen */ + fprintf(stderr, "failed to initialize replication system variables"); + unireg_abort(1); + } + return 0; +} + #endif /* HAVE_REPLICATION */ |