summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorNirbhay Choubey <nirbhay@mariadb.com>2015-05-09 17:09:21 -0400
committerNirbhay Choubey <nirbhay@mariadb.com>2015-05-09 17:09:21 -0400
commite11cad9e9dd3ae0be61aec1bb50b0ddc867b10be (patch)
treed1266ef4e52851e73467a6d7bf4a3ca991b484fa /sql/slave.cc
parent99f496ae65a56d587e24c88df85aae7e7cfce70e (diff)
parent0880284bf715b4916cc735e19b76d1062c2bfdcf (diff)
downloadmariadb-git-e11cad9e9dd3ae0be61aec1bb50b0ddc867b10be.tar.gz
Merge tag 'mariadb-10.0.19' into 10.0-galera
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc148
1 files changed, 122 insertions, 26 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index d24f72ff891..c9dc55f2bf7 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1,5 +1,5 @@
-/* Copyright (c) 2000, 2013, Oracle and/or its affiliates.
- Copyright (c) 2008, 2014, SkySQL Ab.
+/* Copyright (c) 2000, 2015, Oracle and/or its affiliates.
+ Copyright (c) 2008, 2015, MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -151,26 +151,18 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(Relay_log_info* rli);
static bool io_slave_killed(Master_info* mi);
static bool sql_slave_killed(rpl_group_info *rgi);
-static int init_slave_thread(THD* thd, Master_info *mi,
- SLAVE_THD_TYPE thd_type);
+static int init_slave_thread(THD*, Master_info *, SLAVE_THD_TYPE);
static void print_slave_skip_errors(void);
static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi);
-static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
- bool suppress_warnings);
-static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
- bool reconnect, bool suppress_warnings);
+static int safe_reconnect(THD*, MYSQL*, Master_info*, bool);
+static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool);
static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size);
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
-static int terminate_slave_thread(THD *thd,
- mysql_mutex_t *term_lock,
- mysql_cond_t *term_cond,
- volatile uint *slave_running,
- bool skip_lock);
+static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *,
+ volatile uint *, bool);
static bool check_io_slave_killed(Master_info *mi, const char *info);
-static bool send_show_master_info_header(THD *thd, bool full,
- size_t gtid_pos_length);
-static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
- String *gtid_pos);
+static bool send_show_master_info_header(THD *, bool, size_t);
+static bool send_show_master_info_data(THD *, Master_info *, bool, String *);
/*
Function to set the slave's max_allowed_packet based on the value
of slave_max_allowed_packet.
@@ -655,6 +647,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
mysql_mutex_unlock(log_lock);
+
+ if (opt_slave_parallel_threads > 0 &&
+ !master_info_index->any_slave_sql_running())
+ rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
}
if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
{
@@ -946,6 +942,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
Master_info::USE_GTID_CURRENT_POS);
mi->events_queued_since_last_gtid= 0;
mi->gtid_reconnect_event_skip_count= 0;
+
+ mi->rli.restart_gtid_pos.reset();
}
if (!error && (thread_mask & SLAVE_IO))
@@ -959,7 +957,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mi);
if (!error && (thread_mask & SLAVE_SQL))
{
- error= start_slave_thread(
+ if (opt_slave_parallel_threads > 0)
+ error= rpl_parallel_activate_pool(&global_rpl_thread_pool);
+ if (!error)
+ error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
key_thread_slave_sql,
#endif
@@ -2004,11 +2005,21 @@ after_set_capability:
!(master_row= mysql_fetch_row(master_res)))
{
err_code= mysql_errno(mysql);
- errmsg= "The slave I/O thread stops because master does not support "
- "MariaDB global transaction id. A fatal error is encountered when "
- "it tries to SELECT @@GLOBAL.gtid_domain_id.";
- sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
- goto err;
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code, NULL,
+ "Get master @@GLOBAL.gtid_domain_id failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ errmsg= "The slave I/O thread stops because master does not support "
+ "MariaDB global transaction id. A fatal error is encountered when "
+ "it tries to SELECT @@GLOBAL.gtid_domain_id.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
}
mysql_free_result(master_res);
master_res= NULL;
@@ -3345,7 +3356,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
else
{
/*
- Make sure we do not errorneously update gtid_slave_pos with a lingering
+ Make sure we do not erroneously update gtid_slave_pos with a lingering
GTID from this failed event group (MDEV-4906).
*/
rgi->gtid_pending= false;
@@ -3485,6 +3496,21 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
Log_event_type typ= ev->get_type_code();
/*
+ Even if we don't execute this event, we keep the master timestamp,
+ so that seconds behind master shows correct delta (there are events
+ that are not replayed, so we keep falling behind).
+
+ If it is an artificial event, or a relay log event (IO thread generated
+ event) or ev->when is set to 0, we don't update the
+ last_master_timestamp.
+ */
+ if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
+ {
+ rli->last_master_timestamp= ev->when + (time_t) ev->exec_time;
+ DBUG_ASSERT(rli->last_master_timestamp >= 0);
+ }
+
+ /*
This tests if the position of the beginning of the current event
hits the UNTIL barrier.
*/
@@ -4512,6 +4538,16 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ /*
+ We initialize the relay log state from the know starting position.
+ It will then be updated as required by GTID and GTID_LIST events found
+ while applying events read from relay logs.
+ */
+ rli->relay_log_state.load(&rpl_global_gtid_slave_state);
+ }
+ rli->gtid_skip_flag = GTID_SKIP_NOT;
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
@@ -4522,6 +4558,7 @@ pthread_handler_t handle_slave_sql(void *arg)
"Error initializing relay log position: %s", errmsg);
goto err;
}
+ rli->reset_inuse_relaylog();
if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
goto err;
@@ -4740,7 +4777,49 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
thd->reset_query();
thd->reset_db(NULL, 0);
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ ulong domain_count;
+
flush_relay_log_info(rli);
+ if (opt_slave_parallel_threads > 0)
+ {
+ /*
+ In parallel replication GTID mode, we may stop with different domains
+ at different positions in the relay log.
+
+ To handle this when we restart the SQL thread, mark the current
+ per-domain position in the Relay_log_info.
+ */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ domain_count= rpl_global_gtid_slave_state.count();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ if (domain_count > 1)
+ {
+ inuse_relaylog *ir;
+
+ /*
+ Load the starting GTID position, so that we can skip already applied
+ GTIDs when we restart the SQL thread. And set the start position in
+ the relay log back to a known safe place to start (prior to any not
+ yet applied transaction in any domain).
+ */
+ rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0);
+ if ((ir= rli->inuse_relaylog_list))
+ {
+ rpl_gtid *gtid= ir->relay_log_state;
+ uint32 count= ir->relay_log_state_count;
+ while (count > 0)
+ {
+ process_gtid_for_restart_pos(rli, gtid);
+ ++gtid;
+ --count;
+ }
+ strmake_buf(rli->group_relay_log_name, ir->name);
+ rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ }
+ }
+ }
+ }
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
mysql_mutex_lock(&rli->run_lock);
@@ -4753,6 +4832,7 @@ err_during_init:
/* Forget the relay log's format */
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= 0;
+ rli->reset_inuse_relaylog();
/* Wake up master_pos_wait() */
mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
@@ -4767,7 +4847,7 @@ err_during_init:
*/
thd->temporary_tables = 0; // remove tempation from destructor to close them
THD_CHECK_SENTRY(thd);
- serial_rgi->thd= rli->sql_driver_thd= 0;
+ rli->sql_driver_thd= 0;
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
thd->rgi_fake= thd->rgi_slave= NULL;
@@ -6024,7 +6104,23 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
}
#endif
- mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ /*
+ If server's default charset is not supported (like utf16, utf32) as client
+ charset, then set client charset to 'latin1' (default client charset).
+ */
+ if (is_supported_parser_charset(default_charset_info))
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ else
+ {
+ sql_print_information("'%s' can not be used as client character set. "
+ "'%s' will be used as default client character set "
+ "while connecting to master.",
+ default_charset_info->csname,
+ default_client_charset_info->csname);
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME,
+ default_client_charset_info->csname);
+ }
+
/* This one is not strictly needed but we have it here for completeness */
mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);