summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
authorunknown <cmiller@zippy.cornsilk.net>2007-10-17 14:05:43 -0400
committerunknown <cmiller@zippy.cornsilk.net>2007-10-17 14:05:43 -0400
commitf48feae696f2ce6cf2261c50789911da245b9aa6 (patch)
tree892096ddfa11f5fea89ad26528d5c64fbc05ffce /sql/sql_repl.cc
parent1fc9612c670264500e726a10e85332da2feed9a6 (diff)
parent03bef972d3b508013cfcad2de294569ecdf16158 (diff)
downloadmariadb-git-f48feae696f2ce6cf2261c50789911da245b9aa6.tar.gz
Merge zippy.cornsilk.net:/home/cmiller/work/mysql/mysql-5.1-comeng-unification
into zippy.cornsilk.net:/home/cmiller/work/mysql/mysql-5.1-recentcommmerge BitKeeper/deleted/.del-ha_berkeley.cc: Auto merged BitKeeper/deleted/.del-mysqld.vcproj~6aa7b3f9c3e28fcb: Auto merged BitKeeper/triggers/post-commit: Auto merged client/mysqlcheck.c: Auto merged include/config-win.h: Auto merged include/my_dbug.h: Auto merged libmysqld/Makefile.am: Auto merged mysql-test/r/func_in.result: Auto merged mysql-test/r/information_schema.result: Auto merged mysql-test/r/information_schema_db.result: Auto merged mysql-test/t/func_in.test: Auto merged mysql-test/t/information_schema.test: Auto merged sql/Makefile.am: Auto merged sql/ha_ndbcluster.cc: Auto merged sql/item_cmpfunc.cc: Auto merged sql/item_func.cc: Auto merged sql/lock.cc: Auto merged sql/log_event.cc: Auto merged sql/repl_failsafe.cc: Auto merged sql/set_var.h: Auto merged sql/sp_head.cc: Auto merged sql/sql_base.cc: Auto merged sql/sql_class.h: Auto merged sql/sql_delete.cc: Auto merged sql/sql_insert.cc: Auto merged sql/sql_lex.cc: Auto merged sql/sql_prepare.cc: Auto merged sql/sql_repl.cc: Auto merged sql/sql_view.cc: Auto merged sql/structs.h: Auto merged sql/table.h: Auto merged storage/archive/ha_archive.cc: Auto merged storage/myisam/ha_myisam.cc: Auto merged storage/myisam/mi_open.c: Auto merged storage/myisammrg/ha_myisammrg.cc: Auto merged storage/ndb/src/common/util/File.cpp: Auto merged configure.in: Manual merge. sql/CMakeLists.txt: Manual merge. sql/mysql_priv.h: Manual merge. sql/mysqld.cc: Manual merge. sql/set_var.cc: Manual merge. sql/slave.cc: Manual merge. sql/sql_cache.cc: Manual merge. sql/sql_class.cc: Manual merge. sql/sql_lex.h: Manual merge. sql/sql_parse.cc: Manual merge. sql/sql_select.cc: Manual merge. sql/sql_show.cc: Manual merge. sql/sql_table.cc: Manual merge. sql/sql_update.cc: Manual merge. sql/sql_yacc.yy: Manual merge.
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc193
1 files changed, 168 insertions, 25 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 99e969f9427..5eb9c61de64 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -72,7 +72,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN);
packet->append(p,ident_len);
- if (my_net_write(net, (char*)packet->ptr(), packet->length()))
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
*errmsg = "failed on my_net_write()";
DBUG_RETURN(-1);
@@ -83,12 +83,13 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
static int send_file(THD *thd)
{
NET* net = &thd->net;
- int fd = -1,bytes, error = 1;
+ int fd = -1, error = 1;
+ size_t bytes;
char fname[FN_REFLEN+1];
const char *errmsg = 0;
int old_timeout;
unsigned long packet_len;
- char buf[IO_SIZE]; // It's safe to alloc this
+ uchar buf[IO_SIZE]; // It's safe to alloc this
DBUG_ENTER("send_file");
/*
@@ -96,7 +97,7 @@ static int send_file(THD *thd)
the job
*/
old_timeout= net->read_timeout;
- net_set_read_timeout(net, thd->variables.net_wait_timeout);
+ my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
/*
We need net_flush here because the client will not know it needs to send
@@ -121,7 +122,7 @@ static int send_file(THD *thd)
goto err;
}
- while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE, MYF(0))) > 0)
+ while ((long) (bytes= my_read(fd, buf, IO_SIZE, MYF(0))) > 0)
{
if (my_net_write(net, buf, bytes))
{
@@ -131,7 +132,7 @@ static int send_file(THD *thd)
}
end:
- if (my_net_write(net, "", 0) || net_flush(net) ||
+ if (my_net_write(net, (uchar*) "", 0) || net_flush(net) ||
(my_net_read(net) == packet_error))
{
errmsg = "while negotiating file transfer close";
@@ -140,7 +141,7 @@ static int send_file(THD *thd)
error = 0;
err:
- net_set_read_timeout(net, old_timeout);
+ my_net_set_read_timeout(net, old_timeout);
if (fd >= 0)
(void) my_close(fd, MYF(0));
if (errmsg)
@@ -217,7 +218,8 @@ bool log_in_use(const char* log_name)
if ((linfo = tmp->current_linfo))
{
pthread_mutex_lock(&linfo->lock);
- result = !bcmp(log_name, linfo->log_file_name, log_name_len);
+ result = !bcmp((uchar*) log_name, (uchar*) linfo->log_file_name,
+ log_name_len);
pthread_mutex_unlock(&linfo->lock);
if (result)
break;
@@ -480,7 +482,7 @@ impossible position";
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+1, (ulong) 0);
/* send it */
- if (my_net_write(net, (char*)packet->ptr(), packet->length()))
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -538,7 +540,7 @@ impossible position";
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
- if (my_net_write(net, (char*)packet->ptr(), packet->length()))
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -651,7 +653,7 @@ impossible position";
if (read_packet)
{
thd_proc_info(thd, "Sending binlog event to slave");
- if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -760,7 +762,7 @@ err:
DBUG_VOID_RETURN;
}
-int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
+int start_slave(THD* thd , Master_info* mi, bool net_report)
{
int slave_errno= 0;
int thread_mask;
@@ -797,7 +799,7 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
if (thd->lex->mi.pos)
{
- mi->rli.until_condition= RELAY_LOG_INFO::UNTIL_MASTER_POS;
+ mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
mi->rli.until_log_pos= thd->lex->mi.pos;
/*
We don't check thd->lex->mi.log_file_name for NULL here
@@ -808,7 +810,7 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
}
else if (thd->lex->mi.relay_log_pos)
{
- mi->rli.until_condition= RELAY_LOG_INFO::UNTIL_RELAY_POS;
+ mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
sizeof(mi->rli.until_log_name)-1);
@@ -816,7 +818,7 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
else
mi->rli.clear_until_condition();
- if (mi->rli.until_condition != RELAY_LOG_INFO::UNTIL_NONE)
+ if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
{
/* Preparing members for effective until condition checking */
const char *p= fn_ext(mi->rli.until_log_name);
@@ -838,7 +840,7 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
/* mark the cached result of the UNTIL comparison as "undefined" */
mi->rli.until_log_names_cmp_result=
- RELAY_LOG_INFO::UNTIL_LOG_NAMES_CMP_UNKNOWN;
+ Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
/* Issuing warning then started without --skip-slave-start */
if (!opt_skip_slave_start)
@@ -885,7 +887,7 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
}
-int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
+int stop_slave(THD* thd, Master_info* mi, bool net_report )
{
DBUG_ENTER("stop_slave");
@@ -951,7 +953,7 @@ int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
*/
-int reset_slave(THD *thd, MASTER_INFO* mi)
+int reset_slave(THD *thd, Master_info* mi)
{
MY_STAT stat_area;
char fname[FN_REFLEN];
@@ -989,7 +991,7 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
Reset errors (the idea is that we forget about the
old master).
*/
- mi->rli.clear_slave_error();
+ mi->rli.clear_error();
mi->rli.clear_until_condition();
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
@@ -1065,7 +1067,7 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
}
-bool change_master(THD* thd, MASTER_INFO* mi)
+bool change_master(THD* thd, Master_info* mi)
{
int thread_mask;
const char* errmsg= 0;
@@ -1261,7 +1263,7 @@ bool change_master(THD* thd, MASTER_INFO* mi)
pthread_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
/* Clear the errors, for a clean start */
- mi->rli.clear_slave_error();
+ mi->rli.clear_error();
mi->rli.clear_until_condition();
/*
If we don't write new coordinates to disk now, then old will remain in
@@ -1440,13 +1442,11 @@ err:
}
if (errmsg)
- {
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
"SHOW BINLOG EVENTS", errmsg);
- DBUG_RETURN(TRUE);
- }
+ else
+ send_eof(thd);
- send_eof(thd);
pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -1607,6 +1607,149 @@ int log_loaded_block(IO_CACHE* file)
return 0;
}
+/*
+ Replication System Variables
+*/
+
+class sys_var_slave_skip_counter :public sys_var
+{
+public:
+ sys_var_slave_skip_counter(sys_var_chain *chain, const char *name_arg)
+ :sys_var(name_arg)
+ { chain_sys_var(chain); }
+ bool check(THD *thd, set_var *var);
+ bool update(THD *thd, set_var *var);
+ bool check_type(enum_var_type type) { return type != OPT_GLOBAL; }
+ /*
+ We can't retrieve the value of this, so we don't have to define
+ type() or value_ptr()
+ */
+};
+
+class sys_var_sync_binlog_period :public sys_var_long_ptr
+{
+public:
+ sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
+ ulong *value_ptr)
+ :sys_var_long_ptr(chain, name_arg,value_ptr) {}
+ bool update(THD *thd, set_var *var);
+};
+
+static sys_var_chain vars = { NULL, NULL };
+
+static sys_var_bool_ptr sys_relay_log_purge(&vars, "relay_log_purge",
+ &relay_log_purge);
+static sys_var_long_ptr sys_slave_net_timeout(&vars, "slave_net_timeout",
+ &slave_net_timeout);
+static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
+ &slave_trans_retries);
+static sys_var_sync_binlog_period sys_sync_binlog_period(&vars, "sync_binlog", &sync_binlog_period);
+static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
+
+static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff);
+
+
+static SHOW_VAR fixed_vars[]= {
+ {"log_slave_updates", (char*) &opt_log_slave_updates, SHOW_MY_BOOL},
+ {"relay_log_space_limit", (char*) &relay_log_space_limit, SHOW_LONGLONG},
+ {"slave_load_tmpdir", (char*) &slave_load_tmpdir, SHOW_CHAR_PTR},
+ {"slave_skip_errors", (char*) &show_slave_skip_errors, SHOW_FUNC},
+};
+
+static int show_slave_skip_errors(THD *thd, SHOW_VAR *var, char *buff)
+{
+ var->type=SHOW_CHAR;
+ var->value= buff;
+ if (!use_slave_mask || bitmap_is_clear_all(&slave_error_mask))
+ {
+ var->value= const_cast<char *>("OFF");
+ }
+ else if (bitmap_is_set_all(&slave_error_mask))
+ {
+ var->value= const_cast<char *>("ALL");
+ }
+ else
+ {
+ /* 10 is enough assuming errors are max 4 digits */
+ int i;
+ var->value= buff;
+ for (i= 1;
+ i < MAX_SLAVE_ERROR &&
+ (buff - var->value) < SHOW_VAR_FUNC_BUFF_SIZE;
+ i++)
+ {
+ if (bitmap_is_set(&slave_error_mask, i))
+ {
+ buff= int10_to_str(i, buff, 10);
+ *buff++= ',';
+ }
+ }
+ if (var->value != buff)
+ buff--; // Remove last ','
+ if (i < MAX_SLAVE_ERROR)
+ buff= strmov(buff, "..."); // Couldn't show all errors
+ *buff=0;
+ }
+ return 0;
+}
+
+bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
+{
+ int result= 0;
+ pthread_mutex_lock(&LOCK_active_mi);
+ pthread_mutex_lock(&active_mi->rli.run_lock);
+ if (active_mi->rli.slave_running)
+ {
+ my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
+ result=1;
+ }
+ pthread_mutex_unlock(&active_mi->rli.run_lock);
+ pthread_mutex_unlock(&LOCK_active_mi);
+ var->save_result.ulong_value= (ulong) var->value->val_int();
+ return result;
+}
+
+
+bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
+{
+ pthread_mutex_lock(&LOCK_active_mi);
+ pthread_mutex_lock(&active_mi->rli.run_lock);
+ /*
+ The following test should normally never be true as we test this
+ in the check function; To be safe against multiple
+ SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
+ */
+ if (!active_mi->rli.slave_running)
+ {
+ pthread_mutex_lock(&active_mi->rli.data_lock);
+ active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
+ pthread_mutex_unlock(&active_mi->rli.data_lock);
+ }
+ pthread_mutex_unlock(&active_mi->rli.run_lock);
+ pthread_mutex_unlock(&LOCK_active_mi);
+ return 0;
+}
+
+
+bool sys_var_sync_binlog_period::update(THD *thd, set_var *var)
+{
+ sync_binlog_period= (ulong) var->save_result.ulonglong_value;
+ return 0;
+}
+
+int init_replication_sys_vars()
+{
+ mysql_append_static_vars(fixed_vars, sizeof(fixed_vars) / sizeof(SHOW_VAR));
+
+ if (mysql_add_sys_var_chain(vars.first, my_long_options))
+ {
+ /* should not happen */
+ fprintf(stderr, "failed to initialize replication system variables");
+ unireg_abort(1);
+ }
+ return 0;
+}
+
#endif /* HAVE_REPLICATION */