summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc217
1 files changed, 192 insertions, 25 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 9cc0ba7c29a..5d9d30b6020 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -16,8 +16,10 @@
#include "mysql_priv.h"
#ifdef HAVE_REPLICATION
+#include "rpl_mi.h"
#include "sql_repl.h"
#include "log_event.h"
+#include "rpl_filter.h"
#include <my_dir.h>
int max_binlog_dump_events = 0; // unlimited
@@ -70,7 +72,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN);
packet->append(p,ident_len);
- if (my_net_write(net, (char*)packet->ptr(), packet->length()))
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
*errmsg = "failed on my_net_write()";
DBUG_RETURN(-1);
@@ -81,12 +83,13 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
static int send_file(THD *thd)
{
NET* net = &thd->net;
- int fd = -1,bytes, error = 1;
+ int fd = -1, error = 1;
+ size_t bytes;
char fname[FN_REFLEN+1];
const char *errmsg = 0;
int old_timeout;
unsigned long packet_len;
- char buf[IO_SIZE]; // It's safe to alloc this
+ uchar buf[IO_SIZE]; // It's safe to alloc this
DBUG_ENTER("send_file");
/*
@@ -119,7 +122,7 @@ static int send_file(THD *thd)
goto err;
}
- while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE, MYF(0))) > 0)
+ while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
{
if (my_net_write(net, buf, bytes))
{
@@ -129,7 +132,7 @@ static int send_file(THD *thd)
}
end:
- if (my_net_write(net, "", 0) || net_flush(net) ||
+ if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
(my_net_read(net) == packet_error))
{
errmsg = "while negotiating file transfer close";
@@ -215,7 +218,8 @@ bool log_in_use(const char* log_name)
if ((linfo = tmp->current_linfo))
{
pthread_mutex_lock(&linfo->lock);
- result = !bcmp(log_name, linfo->log_file_name, log_name_len);
+ result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
+ log_name_len);
pthread_mutex_unlock(&linfo->lock);
if (result)
break;
@@ -239,6 +243,7 @@ bool purge_error_message(THD* thd, int res)
case LOG_INFO_MEM: errmsg= ER_OUT_OF_RESOURCES; break;
case LOG_INFO_FATAL: errmsg= ER_BINLOG_PURGE_FATAL_ERR; break;
case LOG_INFO_IN_USE: errmsg= ER_LOG_IN_USE; break;
+ case LOG_INFO_EMFILE: errmsg= ER_BINLOG_PURGE_EMFILE; break;
default: errmsg= ER_LOG_PURGE_UNKNOWN_ERR; break;
}
@@ -477,7 +482,7 @@ impossible position";
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+1, (ulong) 0);
/* send it */
- if (my_net_write(net, (char*)packet->ptr(), packet->length()))
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -535,7 +540,7 @@ impossible position";
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
- if (my_net_write(net, (char*)packet->ptr(), packet->length()))
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -648,7 +653,7 @@ impossible position";
if (read_packet)
{
thd->proc_info = "Sending binlog event to slave";
- if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -811,7 +816,7 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
sizeof(mi->rli.until_log_name)-1);
}
else
- clear_until_condition(&mi->rli);
+ mi->rli.clear_until_condition();
if (mi->rli.until_condition != RELAY_LOG_INFO::UNTIL_NONE)
{
@@ -965,6 +970,9 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
error=1;
goto err;
}
+
+ ha_reset_slave(thd);
+
// delete relay logs, clear relay log coordinates
if ((error= purge_relay_logs(&mi->rli, thd,
1 /* just reset */,
@@ -983,8 +991,8 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
Reset errors (the idea is that we forget about the
old master).
*/
- clear_slave_error(&mi->rli);
- clear_until_condition(&mi->rli);
+ mi->rli.clear_error();
+ mi->rli.clear_until_condition();
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
end_master_info(mi);
@@ -1125,6 +1133,11 @@ bool change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->ssl != LEX_MASTER_INFO::SSL_UNCHANGED)
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::SSL_ENABLE);
+
+ if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::SSL_UNCHANGED)
+ mi->ssl_verify_server_cert=
+ (lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::SSL_ENABLE);
+
if (lex_mi->ssl_ca)
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
if (lex_mi->ssl_capath)
@@ -1137,7 +1150,8 @@ bool change_master(THD* thd, MASTER_INFO* mi)
strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
#ifndef HAVE_OPENSSL
if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
- lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key )
+ lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
+ lex_mi->ssl_verify_server_cert )
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
#endif
@@ -1249,8 +1263,8 @@ bool change_master(THD* thd, MASTER_INFO* mi)
pthread_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
/* Clear the errors, for a clean start */
- clear_slave_error(&mi->rli);
- clear_until_condition(&mi->rli);
+ mi->rli.clear_error();
+ mi->rli.clear_until_condition();
/*
If we don't write new coordinates to disk now, then old will remain in
relay-log.info until START SLAVE is issued; but if mysqld is shutdown
@@ -1300,12 +1314,12 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
bool mysql_show_binlog_events(THD* thd)
{
Protocol *protocol= thd->protocol;
- DBUG_ENTER("mysql_show_binlog_events");
List<Item> field_list;
const char *errmsg = 0;
bool ret = TRUE;
IO_CACHE log;
File file = -1;
+ DBUG_ENTER("mysql_show_binlog_events");
Log_event::init_show_field_list(&field_list);
if (protocol->send_fields(&field_list,
@@ -1315,6 +1329,13 @@ bool mysql_show_binlog_events(THD* thd)
Format_description_log_event *description_event= new
Format_description_log_event(3); /* MySQL 4.0 by default */
+ /*
+ Wait for handlers to insert any pending information
+ into the binlog. For e.g. ndb which updates the binlog asynchronously
+ this is needed so that the uses sees all its own commands in the binlog
+ */
+ ha_binlog_wait(thd);
+
if (mysql_bin_log.is_open())
{
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
@@ -1352,12 +1373,12 @@ bool mysql_show_binlog_events(THD* thd)
pthread_mutex_lock(log_lock);
/*
- open_binlog() sought to position 4.
- Read the first event in case it's a Format_description_log_event, to
- know the format. If there's no such event, we are 3.23 or 4.x. This
- code, like before, can't read 3.23 binlogs.
- This code will fail on a mixed relay log (one which has Format_desc then
- Rotate then Format_desc).
+ open_binlog() sought to position 4.
+ Read the first event in case it's a Format_description_log_event, to
+ know the format. If there's no such event, we are 3.23 or 4.x. This
+ code, like before, can't read 3.23 binlogs.
+ This code will fail on a mixed relay log (one which has Format_desc then
+ Rotate then Format_desc).
*/
ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
@@ -1381,7 +1402,8 @@ bool mysql_show_binlog_events(THD* thd)
}
for (event_count = 0;
- (ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event)); )
+ (ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
+ description_event)); )
{
if (event_count >= limit_start &&
ev->net_send(protocol, linfo.log_file_name, pos))
@@ -1455,8 +1477,8 @@ bool show_binlog_info(THD* thd)
int dir_len = dirname_length(li.log_file_name);
protocol->store(li.log_file_name + dir_len, &my_charset_bin);
protocol->store((ulonglong) li.pos);
- protocol->store(&binlog_do_db);
- protocol->store(&binlog_ignore_db);
+ protocol->store(binlog_filter->get_do_db());
+ protocol->store(binlog_filter->get_ignore_db());
if (protocol->write())
DBUG_RETURN(TRUE);
}
@@ -1561,6 +1583,8 @@ int log_loaded_block(IO_CACHE* file)
if (!(block_len = (char*) file->read_end - (char*) buffer))
return 0;
lf_info = (LOAD_FILE_INFO*) file->arg;
+ if (lf_info->thd->current_stmt_binlog_row_based)
+ return 0;
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
lf_info->last_pos_in_file >= file->pos_in_file)
return 0;
@@ -1583,6 +1607,149 @@ int log_loaded_block(IO_CACHE* file)
return 0;
}
+/*
+ Replication System Variables
+*/
+
+class sys_var_slave_skip_counter :public sys_var
+{
+public:
+ sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
+ :sys_var(name_arg)
+ { chain_sys_var(chain); }
+ bool check(THD *thd, set_var *var);
+ bool update(THD *thd, set_var *var);
+ bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
+ /*
+ We can't retrieve the value of this, so we don't have to define
+ type() or value_ptr()
+ */
+};
+
+class sys_var_sync_binlog_period :public sys_var_long_ptr
+{
+public:
+ sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
+ ulong *value_ptr)
+ :sys_var_long_ptr(chain, name_arg,value_ptr) {}
+ bool update(THD *thd, set_var *var);
+};
+
+static sys_var_chain vars = { NULL, NULL };
+
+static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
+ &relay_log_purge);
+static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
+ &slave_net_timeout);
+static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
+ &slave_trans_retries);
+static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
+static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
+
+static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
+
+
+static SHOW_VAR fixed_vars[]= {
+ {"log_slave_updates", (char*) &opt_log_slave_updates, SHOW_MY_BOOL},
+ {"relay_log_space_limit", (char*) &relay_log_space_limit, SHOW_LONGLONG},
+ {"slave_load_tmpdir", (char*) &slave_load_tmpdir, SHOW_CHAR_PTR},
+ {"slave_skip_errors", (char*) &show_slave_skip_errors, SHOW_FUNC},
+};
+
+static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff)
+{
+ var->type=SHOW_CHAR;
+ var->value= buff;
+ if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
+ {
+ var->value= const_cast<char *>("OFF");
+ }
+ else if (bitmap_is_set_all(&slave_error_mask))
+ {
+ var->value= const_cast<char *>("ALL");
+ }
+ else
+ {
+ /* 10 is enough assuming errors are max 4 digits */
+ int i;
+ var->value= buff;
+ for (i= 1;
+ i < MAX_SLAVE_ERROR &&
+ (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
+ i++)
+ {
+ if (bitmap_is_set(&slave_error_mask, i))
+ {
+ buff= int10_to_str(i, buff, 10);
+ *buff++= ',';
+ }
+ }
+ if (var->value != buff)
+ buff--; // Remove last ','
+ if (i < MAX_SLAVE_ERROR)
+ buff= strmov(buff, "..."); // Couldn't show all errors
+ *buff=0;
+ }
+ return 0;
+}
+
+bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
+{
+ int result= 0;
+ pthread_mutex_lock(&LOCK_active_mi);
+ pthread_mutex_lock(&active_mi->rli.run_lock);
+ if (active_mi->rli.slave_running)
+ {
+ my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
+ result=1;
+ }
+ pthread_mutex_unlock(&active_mi->rli.run_lock);
+ pthread_mutex_unlock(&LOCK_active_mi);
+ var->save_result.ulong_value= (ulong) var->value->val_int();
+ return result;
+}
+
+
+bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
+{
+ pthread_mutex_lock(&LOCK_active_mi);
+ pthread_mutex_lock(&active_mi->rli.run_lock);
+ /*
+ The following test should normally never be true as we test this
+ in the check function; To be safe against multiple
+ SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
+ */
+ if (!active_mi->rli.slave_running)
+ {
+ pthread_mutex_lock(&active_mi->rli.data_lock);
+ active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
+ pthread_mutex_unlock(&active_mi->rli.data_lock);
+ }
+ pthread_mutex_unlock(&active_mi->rli.run_lock);
+ pthread_mutex_unlock(&LOCK_active_mi);
+ return 0;
+}
+
+
+bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
+{
+ sync_binlog_period= (ulong) var->save_result.ulonglong_value;
+ return 0;
+}
+
+int init_replication_sys_vars()
+{
+ mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
+
+ if (mysql_add_sys_var_chain(vars.first, my_long_options))
+ {
+ /* should not happen */
+ fprintf(stderr, "failed to initialize replication system variables");
+ unireg_abort(1);
+ }
+ return 0;
+}
+
#endif /* HAVE_REPLICATION */