summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc171
1 files changed, 140 insertions, 31 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 6d64534faf9..e228033f15a 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1,5 +1,5 @@
-/* Copyright (c) 2000, 2013, Oracle and/or its affiliates.
- Copyright (c) 2008, 2014, SkySQL Ab.
+/* Copyright (c) 2000, 2015, Oracle and/or its affiliates.
+ Copyright (c) 2008, 2015, MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -149,26 +149,18 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(Relay_log_info* rli);
static bool io_slave_killed(Master_info* mi);
static bool sql_slave_killed(rpl_group_info *rgi);
-static int init_slave_thread(THD* thd, Master_info *mi,
- SLAVE_THD_TYPE thd_type);
+static int init_slave_thread(THD*, Master_info *, SLAVE_THD_TYPE);
static void print_slave_skip_errors(void);
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
-static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
- bool suppress_warnings);
-static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
- bool reconnect, bool suppress_warnings);
+static int safe_reconnect(THD*, MYSQL*, Master_info*, bool);
+static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool);
static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size);
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
-static int terminate_slave_thread(THD *thd,
- mysql_mutex_t *term_lock,
- mysql_cond_t *term_cond,
- volatile uint *slave_running,
- bool skip_lock);
+static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *,
+ volatile uint *, bool);
static bool check_io_slave_killed(Master_info *mi, const char *info);
-static bool send_show_master_info_header(THD *thd, bool full,
- size_t gtid_pos_length);
-static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
- String *gtid_pos);
+static bool send_show_master_info_header(THD *, bool, size_t);
+static bool send_show_master_info_data(THD *, Master_info *, bool, String *);
/*
Function to set the slave's max_allowed_packet based on the value
of slave_max_allowed_packet.
@@ -372,6 +364,9 @@ int init_slave()
if (run_slave_init_thread())
return 1;
+ if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
+ return 1;
+
/*
This is called when mysqld starts. Before client connections are
accepted. However bootstrap may conflict with us if it does START SLAVE.
@@ -405,9 +400,6 @@ int init_slave()
goto err;
}
- if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
- return 1;
-
/*
If --slave-skip-errors=... was not used, the string value for the
system variable has not been set up yet. Do it now.
@@ -652,6 +644,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
mysql_mutex_unlock(log_lock);
+
+ if (opt_slave_parallel_threads > 0 &&
+ !master_info_index->any_slave_sql_running())
+ rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
}
if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
{
@@ -943,6 +939,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
Master_info::USE_GTID_CURRENT_POS);
mi->events_queued_since_last_gtid= 0;
mi->gtid_reconnect_event_skip_count= 0;
+
+ mi->rli.restart_gtid_pos.reset();
}
if (!error && (thread_mask & SLAVE_IO))
@@ -956,7 +954,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mi);
if (!error && (thread_mask & SLAVE_SQL))
{
- error= start_slave_thread(
+ if (opt_slave_parallel_threads > 0)
+ error= rpl_parallel_activate_pool(&global_rpl_thread_pool);
+ if (!error)
+ error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
key_thread_slave_sql,
#endif
@@ -2002,11 +2003,21 @@ after_set_capability:
!(master_row= mysql_fetch_row(master_res)))
{
err_code= mysql_errno(mysql);
- errmsg= "The slave I/O thread stops because master does not support "
- "MariaDB global transaction id. A fatal error is encountered when "
- "it tries to SELECT @@GLOBAL.gtid_domain_id.";
- sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
- goto err;
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code, NULL,
+ "Get master @@GLOBAL.gtid_domain_id failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ errmsg= "The slave I/O thread stops because master does not support "
+ "MariaDB global transaction id. A fatal error is encountered when "
+ "it tries to SELECT @@GLOBAL.gtid_domain_id.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
}
mysql_free_result(master_res);
master_res= NULL;
@@ -3318,7 +3329,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
else
{
/*
- Make sure we do not errorneously update gtid_slave_pos with a lingering
+ Make sure we do not erroneously update gtid_slave_pos with a lingering
GTID from this failed event group (MDEV-4906).
*/
rgi->gtid_pending= false;
@@ -3458,6 +3469,21 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
Log_event_type typ= ev->get_type_code();
/*
+ Even if we don't execute this event, we keep the master timestamp,
+ so that seconds behind master shows correct delta (there are events
+ that are not replayed, so we keep falling behind).
+
+ If it is an artificial event, or a relay log event (IO thread generated
+ event) or ev->when is set to 0, we don't update the
+ last_master_timestamp.
+ */
+ if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
+ {
+ rli->last_master_timestamp= ev->when + (time_t) ev->exec_time;
+ DBUG_ASSERT(rli->last_master_timestamp >= 0);
+ }
+
+ /*
This tests if the position of the beginning of the current event
hits the UNTIL barrier.
*/
@@ -3568,8 +3594,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
if (slave_trans_retries)
{
- int temp_err;
- LINT_INIT(temp_err);
+ int UNINIT_VAR(temp_err);
if (exec_res && (temp_err= has_temporary_error(thd)))
{
const char *errmsg;
@@ -4388,7 +4413,9 @@ pthread_handler_t handle_slave_sql(void *arg)
my_thread_init();
DBUG_ENTER("handle_slave_sql");
+#ifdef WITH_WSREP
wsrep_restart_point:
+#endif
serial_rgi= new rpl_group_info(rli);
thd = new THD; // note that contructor of THD uses DBUG_ !
@@ -4479,6 +4506,16 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ /*
+ We initialize the relay log state from the know starting position.
+ It will then be updated as required by GTID and GTID_LIST events found
+ while applying events read from relay logs.
+ */
+ rli->relay_log_state.load(&rpl_global_gtid_slave_state);
+ }
+ rli->gtid_skip_flag = GTID_SKIP_NOT;
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
@@ -4489,6 +4526,7 @@ pthread_handler_t handle_slave_sql(void *arg)
"Error initializing relay log position: %s", errmsg);
goto err;
}
+ rli->reset_inuse_relaylog();
if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
goto err;
@@ -4705,7 +4743,49 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
thd->reset_query();
thd->reset_db(NULL, 0);
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ ulong domain_count;
+
flush_relay_log_info(rli);
+ if (mi->using_parallel())
+ {
+ /*
+ In parallel replication GTID mode, we may stop with different domains
+ at different positions in the relay log.
+
+ To handle this when we restart the SQL thread, mark the current
+ per-domain position in the Relay_log_info.
+ */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ domain_count= rpl_global_gtid_slave_state.count();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ if (domain_count > 1)
+ {
+ inuse_relaylog *ir;
+
+ /*
+ Load the starting GTID position, so that we can skip already applied
+ GTIDs when we restart the SQL thread. And set the start position in
+ the relay log back to a known safe place to start (prior to any not
+ yet applied transaction in any domain).
+ */
+ rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0);
+ if ((ir= rli->inuse_relaylog_list))
+ {
+ rpl_gtid *gtid= ir->relay_log_state;
+ uint32 count= ir->relay_log_state_count;
+ while (count > 0)
+ {
+ process_gtid_for_restart_pos(rli, gtid);
+ ++gtid;
+ --count;
+ }
+ strmake_buf(rli->group_relay_log_name, ir->name);
+ rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ }
+ }
+ }
+ }
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
mysql_mutex_lock(&rli->run_lock);
@@ -4718,6 +4798,7 @@ err_during_init:
/* Forget the relay log's format */
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= 0;
+ rli->reset_inuse_relaylog();
/* Wake up master_pos_wait() */
mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
@@ -4732,7 +4813,7 @@ err_during_init:
*/
thd->temporary_tables = 0; // remove tempation from destructor to close them
THD_CHECK_SENTRY(thd);
- serial_rgi->thd= rli->sql_driver_thd= 0;
+ rli->sql_driver_thd= 0;
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
thd->rgi_fake= thd->rgi_slave= NULL;
@@ -5666,6 +5747,18 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
}
break;
+#ifndef DBUG_OFF
+ case XID_EVENT:
+ DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000",
+ {
+ /* Inject an event group that is missing its XID commit event. */
+ if (mi->last_queued_gtid.domain_id == 0 &&
+ mi->last_queued_gtid.seq_no == 1000)
+ goto skip_relay_logging;
+ });
+ /* Fall through to default case ... */
+#endif
+
default:
default_action:
DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
@@ -6040,7 +6133,23 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
}
#endif
- mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ /*
+ If server's default charset is not supported (like utf16, utf32) as client
+ charset, then set client charset to 'latin1' (default client charset).
+ */
+ if (is_supported_parser_charset(default_charset_info))
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ else
+ {
+ sql_print_information("'%s' can not be used as client character set. "
+ "'%s' will be used as default client character set "
+ "while connecting to master.",
+ default_charset_info->csname,
+ default_client_charset_info->csname);
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME,
+ default_client_charset_info->csname);
+ }
+
/* This one is not strictly needed but we have it here for completeness */
mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);