diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 171 |
1 files changed, 140 insertions, 31 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 6d64534faf9..e228033f15a 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1,5 +1,5 @@ -/* Copyright (c) 2000, 2013, Oracle and/or its affiliates. - Copyright (c) 2008, 2014, SkySQL Ab. +/* Copyright (c) 2000, 2015, Oracle and/or its affiliates. + Copyright (c) 2008, 2015, MariaDB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -149,26 +149,18 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev); static bool wait_for_relay_log_space(Relay_log_info* rli); static bool io_slave_killed(Master_info* mi); static bool sql_slave_killed(rpl_group_info *rgi); -static int init_slave_thread(THD* thd, Master_info *mi, - SLAVE_THD_TYPE thd_type); +static int init_slave_thread(THD*, Master_info *, SLAVE_THD_TYPE); static void print_slave_skip_errors(void); static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); -static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, - bool suppress_warnings); -static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, - bool reconnect, bool suppress_warnings); +static int safe_reconnect(THD*, MYSQL*, Master_info*, bool); +static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool); static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size); static int queue_event(Master_info* mi,const char* buf,ulong event_len); -static int terminate_slave_thread(THD *thd, - mysql_mutex_t *term_lock, - mysql_cond_t *term_cond, - volatile uint *slave_running, - bool skip_lock); +static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *, + volatile uint *, bool); static bool check_io_slave_killed(Master_info *mi, const char *info); -static bool send_show_master_info_header(THD *thd, bool full, - size_t gtid_pos_length); -static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, - String *gtid_pos); +static bool send_show_master_info_header(THD *, bool, size_t); +static bool send_show_master_info_data(THD *, Master_info *, bool, String *); /* Function to set the slave's max_allowed_packet based on the value of slave_max_allowed_packet. @@ -372,6 +364,9 @@ int init_slave() if (run_slave_init_thread()) return 1; + if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) + return 1; + /* This is called when mysqld starts. Before client connections are accepted. However bootstrap may conflict with us if it does START SLAVE. @@ -405,9 +400,6 @@ int init_slave() goto err; } - if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) - return 1; - /* If --slave-skip-errors=... was not used, the string value for the system variable has not been set up yet. Do it now. @@ -652,6 +644,10 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); mysql_mutex_unlock(log_lock); + + if (opt_slave_parallel_threads > 0 && + !master_info_index->any_slave_sql_running()) + rpl_parallel_inactivate_pool(&global_rpl_thread_pool); } if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) { @@ -943,6 +939,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, Master_info::USE_GTID_CURRENT_POS); mi->events_queued_since_last_gtid= 0; mi->gtid_reconnect_event_skip_count= 0; + + mi->rli.restart_gtid_pos.reset(); } if (!error && (thread_mask & SLAVE_IO)) @@ -956,7 +954,10 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, mi); if (!error && (thread_mask & SLAVE_SQL)) { - error= start_slave_thread( + if (opt_slave_parallel_threads > 0) + error= rpl_parallel_activate_pool(&global_rpl_thread_pool); + if (!error) + error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE key_thread_slave_sql, #endif @@ -2002,11 +2003,21 @@ after_set_capability: !(master_row= mysql_fetch_row(master_res))) { err_code= mysql_errno(mysql); - errmsg= "The slave I/O thread stops because master does not support " - "MariaDB global transaction id. A fatal error is encountered when " - "it tries to SELECT @@GLOBAL.gtid_domain_id."; - sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); - goto err; + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, NULL, + "Get master @@GLOBAL.gtid_domain_id failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + errmsg= "The slave I/O thread stops because master does not support " + "MariaDB global transaction id. A fatal error is encountered when " + "it tries to SELECT @@GLOBAL.gtid_domain_id."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } } mysql_free_result(master_res); master_res= NULL; @@ -3318,7 +3329,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, else { /* - Make sure we do not errorneously update gtid_slave_pos with a lingering + Make sure we do not erroneously update gtid_slave_pos with a lingering GTID from this failed event group (MDEV-4906). */ rgi->gtid_pending= false; @@ -3458,6 +3469,21 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, Log_event_type typ= ev->get_type_code(); /* + Even if we don't execute this event, we keep the master timestamp, + so that seconds behind master shows correct delta (there are events + that are not replayed, so we keep falling behind). + + If it is an artificial event, or a relay log event (IO thread generated + event) or ev->when is set to 0, we don't update the + last_master_timestamp. + */ + if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) + { + rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; + DBUG_ASSERT(rli->last_master_timestamp >= 0); + } + + /* This tests if the position of the beginning of the current event hits the UNTIL barrier. */ @@ -3568,8 +3594,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, if (slave_trans_retries) { - int temp_err; - LINT_INIT(temp_err); + int UNINIT_VAR(temp_err); if (exec_res && (temp_err= has_temporary_error(thd))) { const char *errmsg; @@ -4388,7 +4413,9 @@ pthread_handler_t handle_slave_sql(void *arg) my_thread_init(); DBUG_ENTER("handle_slave_sql"); +#ifdef WITH_WSREP wsrep_restart_point: +#endif serial_rgi= new rpl_group_info(rli); thd = new THD; // note that contructor of THD uses DBUG_ ! @@ -4479,6 +4506,16 @@ pthread_handler_t handle_slave_sql(void *arg) serial_rgi->gtid_sub_id= 0; serial_rgi->gtid_pending= false; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* + We initialize the relay log state from the know starting position. + It will then be updated as required by GTID and GTID_LIST events found + while applying events read from relay logs. + */ + rli->relay_log_state.load(&rpl_global_gtid_slave_state); + } + rli->gtid_skip_flag = GTID_SKIP_NOT; if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, @@ -4489,6 +4526,7 @@ pthread_handler_t handle_slave_sql(void *arg) "Error initializing relay log position: %s", errmsg); goto err; } + rli->reset_inuse_relaylog(); if (rli->alloc_inuse_relaylog(rli->group_relay_log_name)) goto err; @@ -4705,7 +4743,49 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, thd->reset_query(); thd->reset_db(NULL, 0); if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + ulong domain_count; + flush_relay_log_info(rli); + if (mi->using_parallel()) + { + /* + In parallel replication GTID mode, we may stop with different domains + at different positions in the relay log. + + To handle this when we restart the SQL thread, mark the current + per-domain position in the Relay_log_info. + */ + mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); + domain_count= rpl_global_gtid_slave_state.count(); + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + if (domain_count > 1) + { + inuse_relaylog *ir; + + /* + Load the starting GTID position, so that we can skip already applied + GTIDs when we restart the SQL thread. And set the start position in + the relay log back to a known safe place to start (prior to any not + yet applied transaction in any domain). + */ + rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0); + if ((ir= rli->inuse_relaylog_list)) + { + rpl_gtid *gtid= ir->relay_log_state; + uint32 count= ir->relay_log_state_count; + while (count > 0) + { + process_gtid_for_restart_pos(rli, gtid); + ++gtid; + --count; + } + strmake_buf(rli->group_relay_log_name, ir->name); + rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE; + } + } + } + } THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); thd->add_status_to_global(); mysql_mutex_lock(&rli->run_lock); @@ -4718,6 +4798,7 @@ err_during_init: /* Forget the relay log's format */ delete rli->relay_log.description_event_for_exec; rli->relay_log.description_event_for_exec= 0; + rli->reset_inuse_relaylog(); /* Wake up master_pos_wait() */ mysql_mutex_unlock(&rli->data_lock); DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions")); @@ -4732,7 +4813,7 @@ err_during_init: */ thd->temporary_tables = 0; // remove tempation from destructor to close them THD_CHECK_SENTRY(thd); - serial_rgi->thd= rli->sql_driver_thd= 0; + rli->sql_driver_thd= 0; mysql_mutex_lock(&LOCK_thread_count); THD_CHECK_SENTRY(thd); thd->rgi_fake= thd->rgi_slave= NULL; @@ -5666,6 +5747,18 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } break; +#ifndef DBUG_OFF + case XID_EVENT: + DBUG_EXECUTE_IF("slave_discard_xid_for_gtid_0_x_1000", + { + /* Inject an event group that is missing its XID commit event. */ + if (mi->last_queued_gtid.domain_id == 0 && + mi->last_queued_gtid.seq_no == 1000) + goto skip_relay_logging; + }); + /* Fall through to default case ... */ +#endif + default: default_action: DBUG_EXECUTE_IF("kill_slave_io_after_2_events", @@ -6040,7 +6133,23 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, } #endif - mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + /* + If server's default charset is not supported (like utf16, utf32) as client + charset, then set client charset to 'latin1' (default client charset). + */ + if (is_supported_parser_charset(default_charset_info)) + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + else + { + sql_print_information("'%s' can not be used as client character set. " + "'%s' will be used as default client character set " + "while connecting to master.", + default_charset_info->csname, + default_client_charset_info->csname); + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, + default_client_charset_info->csname); + } + /* This one is not strictly needed but we have it here for completeness */ mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); |