summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc474
1 files changed, 316 insertions, 158 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 6fe6de6872a..6b234697f09 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -40,7 +40,8 @@
#include <my_dir.h>
#include <sql_common.h>
#include <errmsg.h>
-#include <mysqld_error.h>
+#include <ssl_compat.h>
+#include "unireg.h"
#include <mysys_err.h>
#include "rpl_handler.h"
#include <signal.h>
@@ -60,7 +61,6 @@
#include "debug_sync.h"
#include "rpl_parallel.h"
-
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
#define MAX_SLAVE_RETRY_PAUSE 5
@@ -77,6 +77,7 @@ Master_info *active_mi= 0;
Master_info_index *master_info_index;
my_bool replicate_same_server_id;
ulonglong relay_log_space_limit = 0;
+ulonglong opt_read_binlog_speed_limit = 0;
const char *relay_log_index= 0;
const char *relay_log_basename= 0;
@@ -217,7 +218,7 @@ static void set_slave_max_allowed_packet(THD *thd, MYSQL *mysql)
void init_thread_mask(int* mask,Master_info* mi,bool inverse)
{
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
- register int tmp_mask=0;
+ int tmp_mask=0;
DBUG_ENTER("init_thread_mask");
if (set_io)
@@ -296,11 +297,8 @@ handle_slave_background(void *arg __attribute__((unused)))
bool stop;
my_thread_init();
- thd= new THD;
+ thd= new THD(next_thread_id());
thd->thread_stack= (char*) &thd; /* Set approximate stack start */
- mysql_mutex_lock(&LOCK_thread_count);
- thd->thread_id= thread_id++;
- mysql_mutex_unlock(&LOCK_thread_count);
thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thread_safe_increment32(&service_thread_count);
thd->store_globals();
@@ -365,8 +363,8 @@ handle_slave_background(void *arg __attribute__((unused)))
delete thd;
thread_safe_decrement32(&service_thread_count);
signal_thd_deleted();
- my_thread_end();
+ my_thread_end();
return 0;
}
@@ -521,7 +519,7 @@ int init_slave()
if (active_mi->host[0] && !opt_skip_slave_start)
{
int error;
- THD *thd= new THD;
+ THD *thd= new THD(next_thread_id());
thd->thread_stack= (char*) &thd;
thd->store_globals();
@@ -741,8 +739,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
DBUG_PRINT("info",("Flushing relay-log info file."));
if (current_thd)
THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file);
- if (flush_relay_log_info(&mi->rli) ||
- my_sync(mi->rli.info_fd, MYF(MY_WME)))
+ if (mi->rli.flush() || my_sync(mi->rli.info_fd, MYF(MY_WME)))
retval= ER_ERROR_DURING_FLUSH_LOGS;
mysql_mutex_unlock(log_lock);
@@ -1080,6 +1077,7 @@ void slave_prepare_for_shutdown()
mysql_mutex_lock(&LOCK_active_mi);
master_info_index->free_connections();
mysql_mutex_unlock(&LOCK_active_mi);
+ stop_slave_background_thread();
}
/*
@@ -1642,8 +1640,10 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
+ mysql_mutex_lock(&mi->data_lock);
mi->clock_diff_with_master=
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
+ mysql_mutex_unlock(&mi->data_lock);
}
else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
@@ -1655,7 +1655,9 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
}
else
{
+ mysql_mutex_lock(&mi->data_lock);
mi->clock_diff_with_master= 0; /* The "most sensible" value */
+ mysql_mutex_unlock(&mi->data_lock);
sql_print_warning("\"SELECT UNIX_TIMESTAMP()\" failed on master, "
"do not trust column Seconds_Behind_Master of SHOW "
"SLAVE STATUS. Error: %s (%d)",
@@ -2806,6 +2808,15 @@ void show_master_info_get_fields(THD *thd, List<Item> *field_list,
Item_empty_string(thd, "Parallel_Mode",
sizeof("conservative")-1),
mem_root);
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "SQL_Delay", 10,
+ MYSQL_TYPE_LONG));
+ field_list->push_back(new (mem_root)
+ Item_return_int(thd, "SQL_Remaining_Delay", 8,
+ MYSQL_TYPE_LONG));
+ field_list->push_back(new (mem_root)
+ Item_empty_string(thd, "Slave_SQL_Running_State",
+ 20));
if (full)
{
field_list->push_back(new (mem_root)
@@ -2997,6 +3008,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
prot_store_ids(thd, &mi->ignore_server_ids);
// Master_Server_id
protocol->store((uint32) mi->master_id);
+ // SQL_Delay
// Master_Ssl_Crl
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
@@ -3019,6 +3031,22 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(mode_name, strlen(mode_name), &my_charset_bin);
}
+ protocol->store((uint32) mi->rli.get_sql_delay());
+ // SQL_Remaining_Delay
+ // THD::proc_info is not protected by any lock, so we read it once
+ // to ensure that we use the same value throughout this function.
+ const char *slave_sql_running_state=
+ mi->rli.sql_driver_thd ? mi->rli.sql_driver_thd->proc_info : "";
+ if (slave_sql_running_state == Relay_log_info::state_delaying_string)
+ {
+ time_t t= my_time(0), sql_delay_end= mi->rli.get_sql_delay_end();
+ protocol->store((uint32)(t < sql_delay_end ? sql_delay_end - t : 0));
+ }
+ else
+ protocol->store_null();
+ // Slave_SQL_Running_State
+ protocol->store(slave_sql_running_state, &my_charset_bin);
+
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -3143,13 +3171,10 @@ void set_slave_thread_default_charset(THD* thd, rpl_group_info *rgi)
{
DBUG_ENTER("set_slave_thread_default_charset");
- thd->variables.character_set_client=
- global_system_variables.character_set_client;
- thd->variables.collation_connection=
- global_system_variables.collation_connection;
thd->variables.collation_server=
global_system_variables.collation_server;
- thd->update_charset();
+ thd->update_charset(global_system_variables.character_set_client,
+ global_system_variables.collation_connection);
thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
DBUG_VOID_RETURN;
@@ -3188,9 +3213,6 @@ static int init_slave_thread(THD* thd, Master_info *mi,
thd->variables.sql_log_slow= opt_log_slow_slave_statements;
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
set_slave_thread_options(thd);
- mysql_mutex_lock(&LOCK_thread_count);
- thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
- mysql_mutex_unlock(&LOCK_thread_count);
if (thd_type == SLAVE_THD_SQL)
THD_STAGE_INFO(thd, stage_waiting_for_the_next_event_in_relay_log);
@@ -3296,13 +3318,15 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
try a reconnect. We do not want to print anything to
the error log in this case because this a anormal
event in an idle server.
+ network_read_len get the real network read length in VIO, especially using compressed protocol
RETURN VALUES
'packet_error' Error
number Length of packet
*/
-static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
+static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
+ ulong* network_read_len)
{
ulong len;
DBUG_ENTER("read_event");
@@ -3317,7 +3341,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
DBUG_RETURN(packet_error);
#endif
- len = cli_safe_read(mysql);
+ len = cli_safe_read_reallen(mysql, network_read_len);
if (len == packet_error || (long) len < 1)
{
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
@@ -3392,6 +3416,73 @@ has_temporary_error(THD *thd)
}
+/**
+ If this is a lagging slave (specified with CHANGE MASTER TO MASTER_DELAY = X), delays accordingly. Also unlocks rli->data_lock.
+
+ Design note: this is the place to unlock rli->data_lock. The lock
+ must be held when reading delay info from rli, but it should not be
+ held while sleeping.
+
+ @param ev Event that is about to be executed.
+
+ @param thd The sql thread's THD object.
+
+ @param rli The sql thread's Relay_log_info structure.
+
+ @retval 0 If the delay timed out and the event shall be executed.
+
+ @retval nonzero If the delay was interrupted and the event shall be skipped.
+*/
+int
+sql_delay_event(Log_event *ev, THD *thd, rpl_group_info *rgi)
+{
+ Relay_log_info* rli= rgi->rli;
+ long sql_delay= rli->get_sql_delay();
+
+ DBUG_ENTER("sql_delay_event");
+ mysql_mutex_assert_owner(&rli->data_lock);
+ DBUG_ASSERT(!rli->belongs_to_client());
+
+ int type= ev->get_type_code();
+ if (sql_delay && type != ROTATE_EVENT &&
+ type != FORMAT_DESCRIPTION_EVENT && type != START_EVENT_V3)
+ {
+ // The time when we should execute the event.
+ time_t sql_delay_end=
+ ev->when + rli->mi->clock_diff_with_master + sql_delay;
+ // The current time.
+ time_t now= my_time(0);
+ // The time we will have to sleep before executing the event.
+ unsigned long nap_time= 0;
+ if (sql_delay_end > now)
+ nap_time= (ulong)(sql_delay_end - now);
+
+ DBUG_PRINT("info", ("sql_delay= %lu "
+ "ev->when= %lu "
+ "rli->mi->clock_diff_with_master= %lu "
+ "now= %ld "
+ "sql_delay_end= %llu "
+ "nap_time= %ld",
+ sql_delay, (long)ev->when,
+ rli->mi->clock_diff_with_master,
+ (long)now, (ulonglong)sql_delay_end, (long)nap_time));
+
+ if (sql_delay_end > now)
+ {
+ DBUG_PRINT("info", ("delaying replication event %lu secs",
+ nap_time));
+ rli->start_sql_delay(sql_delay_end);
+ mysql_mutex_unlock(&rli->data_lock);
+ DBUG_RETURN(slave_sleep(thd, nap_time, sql_slave_killed, rgi));
+ }
+ }
+
+ mysql_mutex_unlock(&rli->data_lock);
+
+ DBUG_RETURN(0);
+}
+
+
/*
First half of apply_event_and_update_pos(), see below.
Setup some THD variables for applying the event.
@@ -3441,12 +3532,6 @@ apply_event_and_update_pos_setup(Log_event* ev, THD* thd, rpl_group_info *rgi)
thd->variables.server_id = ev->server_id;
thd->set_time(); // time the query
thd->lex->current_select= 0;
- if (!ev->when)
- {
- my_hrtime_t hrtime= my_hrtime();
- ev->when= hrtime_to_my_time(hrtime);
- ev->when_sec_part= hrtime_sec_part(hrtime);
- }
thd->variables.option_bits=
(thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
@@ -3515,16 +3600,16 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
if (exec_res == 0)
{
int error= ev->update_pos(rgi);
-#ifdef HAVE_valgrind
- if (!rli->is_fake)
-#endif
+ #ifndef DBUG_OFF
+ DBUG_PRINT("info", ("update_pos error = %d", error));
+ if (!rli->belongs_to_client())
{
- DBUG_PRINT("info", ("update_pos error = %d", error));
DBUG_PRINT("info", ("group %llu %s", rli->group_relay_log_pos,
rli->group_relay_log_name));
DBUG_PRINT("info", ("event %llu %s", rli->event_relay_log_pos,
rli->event_relay_log_name));
}
+#endif
/*
The update should not fail, so print an error message and
return an error code.
@@ -3559,21 +3644,39 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
/**
Applies the given event and advances the relay log position.
- In essence, this function does:
+ This is needed by the sql thread to execute events from the binlog,
+ and by clients executing BINLOG statements. Conceptually, this
+ function does:
@code
ev->apply_event(rli);
ev->update_pos(rli);
@endcode
- But it also does some maintainance, such as skipping events if
- needed and reporting errors.
+ It also does the following maintainance:
- If the @c skip flag is set, then it is tested whether the event
- should be skipped, by looking at the slave_skip_counter and the
- server id. The skip flag should be set when calling this from a
- replication thread but not set when executing an explicit BINLOG
- statement.
+ - Initializes the thread's server_id and time; and the event's
+ thread.
+
+ - If !rli->belongs_to_client() (i.e., if it belongs to the slave
+ sql thread instead of being used for executing BINLOG
+ statements), it does the following things: (1) skips events if it
+ is needed according to the server id or slave_skip_counter; (2)
+ unlocks rli->data_lock; (3) sleeps if required by 'CHANGE MASTER
+ TO MASTER_DELAY=X'; (4) maintains the running state of the sql
+ thread (rli->thread_state).
+
+ - Reports errors as needed.
+
+ @param ev The event to apply.
+
+ @param thd The client thread that executes the event (i.e., the
+ slave sql thread if called from a replication slave, or the client
+ thread if called to execute a BINLOG statement).
+
+ @param rli The relay log info (i.e., the slave's rli if called from
+ a replication slave, or the client's thd->rli_fake if called to
+ execute a BINLOG statement).
@retval 0 OK.
@@ -3596,7 +3699,16 @@ apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi)
DBUG_ASSERT(rli->slave_skip_counter > 0);
rli->slave_skip_counter--;
}
- mysql_mutex_unlock(&rli->data_lock);
+
+ if (reason == Log_event::EVENT_SKIP_NOT)
+ {
+ // Sleeps if needed, and unlocks rli->data_lock.
+ if (sql_delay_event(ev, thd, rgi))
+ return 0;
+ }
+ else
+ mysql_mutex_unlock(&rli->data_lock);
+
return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
}
@@ -3620,6 +3732,10 @@ apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
driver thread, so 23 should never see EVENT_SKIP_COUNT here.
*/
DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT);
+ /*
+ Calling sql_delay_event() was handled in the SQL driver thread when
+ doing parallel replication.
+ */
return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
}
@@ -3667,7 +3783,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
}
/* Check for an event that starts or stops a transaction */
- if (typ == QUERY_EVENT)
+ if (LOG_EVENT_IS_QUERY(typ))
{
Query_log_event *qev= (Query_log_event*) ev;
/*
@@ -3699,7 +3815,8 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
/**
- Top-level function for executing the next event from the relay log.
+ Top-level function for executing the next event in the relay log.
+ This is called from the SQL thread.
This function reads the event from the relay log, executes it, and
advances the relay log position. It also handles errors, etc.
@@ -3806,7 +3923,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/
DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
if ((typ == XID_EVENT) ||
- ((typ == QUERY_EVENT) &&
+ (LOG_EVENT_IS_QUERY(typ) &&
strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
{
DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
@@ -3837,6 +3954,19 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
This is the case for pre-10.0 events without GTID, and for handling
slave_skip_counter.
*/
+ if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
+ {
+ /*
+ Ignore FD's timestamp as it does not reflect the slave execution
+ state but likely to reflect a deep past. Consequently when the first
+ data modification event execution last long all this time
+ Seconds_Behind_Master is zero.
+ */
+ if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ rli->last_master_timestamp= ev->when + (time_t) ev->exec_time;
+
+ DBUG_ASSERT(rli->last_master_timestamp >= 0);
+ }
}
if (typ == GTID_EVENT)
@@ -4128,7 +4258,7 @@ pthread_handler_t handle_slave_io(void *arg)
mysql= NULL ;
retry_count= 0;
- thd= new THD; // note that contructor of THD uses DBUG_ !
+ thd= new THD(next_thread_id()); // note that contructor of THD uses DBUG_ !
mysql_mutex_lock(&mi->run_lock);
/* Inform waiting threads that slave has started */
@@ -4151,9 +4281,7 @@ pthread_handler_t handle_slave_io(void *arg)
goto err_during_init;
}
thd->system_thread_info.rpl_io_info= &io_info;
- mysql_mutex_lock(&LOCK_thread_count);
- threads.append(thd);
- mysql_mutex_unlock(&LOCK_thread_count);
+ add_to_active_threads(thd);
mi->slave_running = MYSQL_SLAVE_RUN_NOT_CONNECT;
mi->abort_slave = 0;
mysql_mutex_unlock(&mi->run_lock);
@@ -4254,8 +4382,10 @@ connected:
};);
#endif
- // TODO: the assignment below should be under mutex (5.0)
+ mysql_mutex_lock(&mi->run_lock);
mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
+ mysql_mutex_unlock(&mi->run_lock);
+
thd->slave_net = &mysql->net;
THD_STAGE_INFO(thd, stage_checking_master_version);
ret= get_master_version_and_clock(mysql, mi);
@@ -4302,6 +4432,7 @@ connected:
}
DBUG_PRINT("info",("Starting reading binary log from master"));
+ thd->set_command(COM_SLAVE_IO);
while (!io_slave_killed(mi))
{
THD_STAGE_INFO(thd, stage_requesting_binlog_dump);
@@ -4319,9 +4450,11 @@ connected:
mi->slave_running= MYSQL_SLAVE_RUN_READING;
DBUG_ASSERT(mi->last_error().number == 0);
+ ulonglong lastchecktime = my_hrtime().val;
+ ulonglong tokenamount = opt_read_binlog_speed_limit*1024;
while (!io_slave_killed(mi))
{
- ulong event_len;
+ ulong event_len, network_read_len = 0;
/*
We say "waiting" because read_event() will wait if there's nothing to
read. But if there's something to read, it will not wait. The
@@ -4329,7 +4462,7 @@ connected:
we're in fact receiving nothing.
*/
THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event);
- event_len= read_event(mysql, mi, &suppress_warnings);
+ event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len);
if (check_io_slave_killed(mi, NullS))
goto err;
@@ -4377,6 +4510,47 @@ Stopping slave I/O thread due to out-of-memory error from master");
goto err;
}
+ /* Control the binlog read speed of master
+ when read_binlog_speed_limit is non-zero
+ */
+ ulonglong speed_limit_in_bytes = opt_read_binlog_speed_limit * 1024;
+ if (speed_limit_in_bytes)
+ {
+ /* Prevent the tokenamount become a large value,
+ for example, the IO thread doesn't work for a long time
+ */
+ if (tokenamount > speed_limit_in_bytes * 2)
+ {
+ lastchecktime = my_hrtime().val;
+ tokenamount = speed_limit_in_bytes * 2;
+ }
+
+ do
+ {
+ ulonglong currenttime = my_hrtime().val;
+ tokenamount += (currenttime - lastchecktime) * speed_limit_in_bytes / (1000*1000);
+ lastchecktime = currenttime;
+ if(tokenamount < network_read_len)
+ {
+ ulonglong duration =1000ULL*1000 * (network_read_len - tokenamount) / speed_limit_in_bytes;
+ time_t second_time = (time_t)(duration / (1000 * 1000));
+ uint micro_time = duration % (1000 * 1000);
+
+ // at least sleep 1000 micro second
+ my_sleep(MY_MAX(micro_time,1000));
+
+ /*
+ If it sleep more than one second,
+ it should use slave_sleep() to avoid the STOP SLAVE hang.
+ */
+ if (second_time)
+ slave_sleep(thd, second_time, io_slave_killed, mi);
+
+ }
+ }while(tokenamount < network_read_len);
+ tokenamount -= network_read_len;
+ }
+
/* XXX: 'synced' should be updated by queue_event to indicate
whether event has been synced to disk */
bool synced= 0;
@@ -4476,6 +4650,7 @@ err:
flush_master_info(mi, TRUE, TRUE);
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
+ unlink_not_visible_thd(thd);
mysql_mutex_lock(&mi->run_lock);
err_during_init:
@@ -4506,9 +4681,7 @@ err_during_init:
DBUG_LEAVE; // Must match DBUG_ENTER()
my_thread_end();
-#ifdef HAVE_OPENSSL
ERR_remove_state(0);
-#endif
pthread_exit(0);
return 0; // Avoid compiler warnings
}
@@ -4535,7 +4708,8 @@ int check_temp_dir(char* tmp_file)
size_t tmp_dir_size;
DBUG_ENTER("check_temp_dir");
- mysql_mutex_lock(&LOCK_thread_count);
+ /* This look is safe to use as this function is only called once */
+ mysql_mutex_lock(&LOCK_start_thread);
if (check_temp_dir_run)
{
if ((result= check_temp_dir_result))
@@ -4574,7 +4748,7 @@ int check_temp_dir(char* tmp_file)
mysql_file_delete(key_file_misc, tmp_file, MYF(0));
end:
- mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&LOCK_start_thread);
DBUG_RETURN(result);
}
@@ -4700,7 +4874,7 @@ pthread_handler_t handle_slave_sql(void *arg)
#endif
serial_rgi= new rpl_group_info(rli);
- thd = new THD; // note that contructor of THD uses DBUG_ !
+ thd = new THD(next_thread_id()); // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->system_thread_info.rpl_sql_info= &sql_info;
@@ -4762,9 +4936,7 @@ pthread_handler_t handle_slave_sql(void *arg)
applied. In all other cases it must be FALSE.
*/
thd->variables.binlog_annotate_row_events= 0;
- mysql_mutex_lock(&LOCK_thread_count);
- threads.append(thd);
- mysql_mutex_unlock(&LOCK_thread_count);
+ add_to_active_threads(thd);
/*
We are going to set slave_running to 1. Assuming slave I/O thread is
alive and connected, this is going to make Seconds_Behind_Master be 0
@@ -4948,6 +5120,7 @@ pthread_handler_t handle_slave_sql(void *arg)
/* Read queries from the IO/THREAD until this thread is killed */
+ thd->set_command(COM_SLAVE_SQL);
while (!sql_slave_killed(serial_rgi))
{
THD_STAGE_INFO(thd, stage_reading_event_from_the_relay_log);
@@ -5042,11 +5215,11 @@ pthread_handler_t handle_slave_sql(void *arg)
my_bool save_log_all_errors= thd->log_all_errors;
/*
- We don't need to check return value for flush_relay_log_info()
+ We don't need to check return value for rli->flush()
as any errors should be logged to stderr
*/
thd->log_all_errors= 1;
- flush_relay_log_info(rli);
+ rli->flush();
thd->log_all_errors= save_log_all_errors;
if (mi->using_parallel())
{
@@ -5090,7 +5263,9 @@ pthread_handler_t handle_slave_sql(void *arg)
}
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
+ unlink_not_visible_thd(thd);
mysql_mutex_lock(&rli->run_lock);
+
err_during_init:
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
mysql_mutex_lock(&rli->data_lock);
@@ -5112,14 +5287,13 @@ err_during_init:
/*
TODO: see if we can do this conditionally in next_event() instead
to avoid unneeded position re-init
+
+ We only reset THD::temporary_tables to 0 here and not free it, as this
+ could be used by slave through Relay_log_info::save_temporary_tables.
*/
- thd->temporary_tables = 0; // remove tempation from destructor to close them
- THD_CHECK_SENTRY(thd);
+ thd->temporary_tables= 0;
rli->sql_driver_thd= 0;
- mysql_mutex_lock(&LOCK_thread_count);
thd->rgi_fake= thd->rgi_slave= NULL;
- delete serial_rgi;
- mysql_mutex_unlock(&LOCK_thread_count);
#ifdef WITH_WSREP
/*
@@ -5148,28 +5322,29 @@ err_during_init:
#endif /* WITH_WSREP */
/*
- Note: the order of the broadcast and unlock calls below (first broadcast, then unlock)
- is important. Otherwise a killer_thread can execute between the calls and
- delete the mi structure leading to a crash! (see BUG#25306 for details)
- */
+ Note: the order of the broadcast and unlock calls below (first
+ broadcast, then unlock) is important. Otherwise a killer_thread can
+ execute between the calls and delete the mi structure leading to a
+ crash! (see BUG#25306 for details)
+ */
mysql_cond_broadcast(&rli->stop_cond);
DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5););
mysql_mutex_unlock(&rli->run_lock); // tell the world we are done
rpl_parallel_resize_pool_if_no_slaves();
+ /* TODO: Check if this lock is needed */
mysql_mutex_lock(&LOCK_thread_count);
- thd->unlink();
+ delete serial_rgi;
mysql_mutex_unlock(&LOCK_thread_count);
+
delete thd;
thread_safe_decrement32(&service_thread_count);
signal_thd_deleted();
DBUG_LEAVE; // Must match DBUG_ENTER()
my_thread_end();
-#ifdef HAVE_OPENSSL
ERR_remove_state(0);
-#endif
pthread_exit(0);
return 0; // Avoid compiler warnings
}
@@ -5584,9 +5759,12 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
bool gtid_skip_enqueue= false;
bool got_gtid_event= false;
rpl_gtid event_gtid;
-#ifndef DBUG_OFF
- static uint dbug_rows_event_count= 0;
-#endif
+ static uint dbug_rows_event_count __attribute__((unused))= 0;
+ bool is_compress_event = false;
+ char* new_buf = NULL;
+ char new_buf_arr[4096];
+ bool is_malloc = false;
+
/*
FD_q must have been prepared for the first R_a event
inside get_master_version_and_clock()
@@ -5632,7 +5810,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
// Emulate the network corruption
DBUG_EXECUTE_IF("corrupt_queue_event",
- if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
{
char *debug_event_buf_c = (char*) buf;
int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
@@ -5935,9 +6113,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
*/
- if (memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
- || mi->master_log_pos > hb.log_pos)
- {
+ if (memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) ||
+ mi->master_log_pos > hb.log_pos) {
/* missed events of heartbeat from the past */
error= ER_SLAVE_HEARTBEAT_FAILURE;
error_msg.append(STRING_WITH_LEN("heartbeat is not compatible with local info;"));
@@ -6085,6 +6262,51 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
inc_pos= event_len;
}
break;
+ /*
+ Binlog compressed event should uncompress in IO thread
+ */
+ case QUERY_COMPRESSED_EVENT:
+ inc_pos= event_len;
+ if (query_event_uncompress(rli->relay_log.description_event_for_queue,
+ checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ buf, event_len, new_buf_arr, sizeof(new_buf_arr),
+ &is_malloc, (char **)&new_buf, &event_len))
+ {
+ char llbuf[22];
+ error = ER_BINLOG_UNCOMPRESS_ERROR;
+ error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: "));
+ llstr(mi->master_log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ buf = new_buf;
+ is_compress_event = true;
+ goto default_action;
+
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ inc_pos = event_len;
+ {
+ if (row_log_event_uncompress(rli->relay_log.description_event_for_queue,
+ checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ buf, event_len, new_buf_arr, sizeof(new_buf_arr),
+ &is_malloc, (char **)&new_buf, &event_len))
+ {
+ char llbuf[22];
+ error = ER_BINLOG_UNCOMPRESS_ERROR;
+ error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: "));
+ llstr(mi->master_log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ }
+ buf = new_buf;
+ is_compress_event = true;
+ goto default_action;
#ifndef DBUG_OFF
case XID_EVENT:
@@ -6102,7 +6324,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
{
if (mi->dbug_do_disconnect &&
- (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) ||
+ (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]) ||
((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT))
&& (--mi->dbug_event_counter == 0))
{
@@ -6115,7 +6337,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_before_commit",
{
if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
- ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(buf, event_len,
checksum_alg)))
{
@@ -6135,7 +6357,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
++mi->events_queued_since_last_gtid;
}
- inc_pos= event_len;
+ if (!is_compress_event)
+ inc_pos= event_len;
+
break;
}
@@ -6226,8 +6450,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
/* everything is filtered out from non-master */
(s_id != mi->master_id ||
/* for the master meta information is necessary */
- (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
/*
Check whether it needs to be filtered based on domain_id
@@ -6256,9 +6480,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
if (!(s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
- (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
+ ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
@@ -6299,7 +6523,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
buf[EVENT_TYPE_OFFSET])) ||
(!mi->last_queued_gtid_standalone &&
((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
- ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(buf, event_len,
checksum_alg))))))
{
@@ -6329,6 +6553,9 @@ err:
mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr());
+ if(is_malloc)
+ my_free((void *)new_buf);
+
DBUG_RETURN(error);
}
@@ -6646,75 +6873,6 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
}
#endif
-/*
- Store the file and position where the execute-slave thread are in the
- relay log.
-
- SYNOPSIS
- flush_relay_log_info()
- rli Relay log information
-
- NOTES
- - As this is only called by the slave thread or on STOP SLAVE, with the
- log_lock grabbed and the slave thread stopped, we don't need to have
- a lock here.
- - If there is an active transaction, then we don't update the position
- in the relay log. This is to ensure that we re-execute statements
- if we die in the middle of an transaction that was rolled back.
- - As a transaction never spans binary logs, we don't have to handle the
- case where we do a relay-log-rotation in the middle of the transaction.
- If this would not be the case, we would have to ensure that we
- don't delete the relay log file where the transaction started when
- we switch to a new relay log file.
-
- TODO
- - Change the log file information to a binary format to avoid calling
- longlong2str.
-
- RETURN VALUES
- 0 ok
- 1 write error
-*/
-
-bool flush_relay_log_info(Relay_log_info* rli)
-{
- bool error=0;
- DBUG_ENTER("flush_relay_log_info");
-
- if (unlikely(rli->no_storage))
- DBUG_RETURN(0);
-
- IO_CACHE *file = &rli->info_file;
- char buff[FN_REFLEN*2+22*2+4], *pos;
-
- my_b_seek(file, 0L);
- pos=strmov(buff, rli->group_relay_log_name);
- *pos++='\n';
- pos= longlong10_to_str(rli->group_relay_log_pos, pos, 10);
- *pos++='\n';
- pos=strmov(pos, rli->group_master_log_name);
- *pos++='\n';
- pos=longlong10_to_str(rli->group_master_log_pos, pos, 10);
- *pos='\n';
- if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
- error=1;
- if (flush_io_cache(file))
- error=1;
- if (sync_relayloginfo_period &&
- !error &&
- ++(rli->sync_counter) >= sync_relayloginfo_period)
- {
- if (my_sync(rli->info_fd, MYF(MY_WME)))
- error=1;
- rli->sync_counter= 0;
- }
- /*
- Flushing the relay log is done by the slave I/O thread
- or by the user on STOP SLAVE.
- */
- DBUG_RETURN(error);
-}
-
/*
Called when we notice that the current "hot" log got rotated under our feet.
@@ -7074,7 +7232,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
}
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name);
- if (flush_relay_log_info(rli))
+ if (rli->flush())
{
errmsg= "error flushing relay log";
goto err;