diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 272 |
1 files changed, 248 insertions, 24 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 41eb7247b8c..d0723c331df 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -162,8 +162,10 @@ static int terminate_slave_thread(THD *thd, volatile uint *slave_running, bool skip_lock); static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info); -static bool send_show_master_info_header(THD *thd, bool full); -static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full); +static bool send_show_master_info_header(THD *thd, bool full, + size_t gtid_pos_length); +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, + String *gtid_pos); /* Find out which replications threads are running @@ -395,6 +397,7 @@ int init_recovery(Master_info* mi, const char** errmsg) DBUG_RETURN(0); } + /** Convert slave skip errors bitmap into a printable string. @@ -718,7 +721,7 @@ int start_slave_thread( if (start_lock) mysql_mutex_lock(start_lock); - if (!server_id) + if (!global_system_variables.server_id) { if (start_cond) mysql_cond_broadcast(start_cond); @@ -796,6 +799,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0; mysql_cond_t* cond_io=0, *cond_sql=0; int error=0; + const char *errmsg; DBUG_ENTER("start_slave_threads"); if (need_slave_mutex) @@ -811,6 +815,22 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, lock_cond_sql = &mi->rli.run_lock; } + /* + If we are using GTID and both SQL and IO threads are stopped, then get + rid of all relay logs. + + Relay logs are not very useful when using GTID, except as a buffer + between the fetch in the IO thread and the apply in SQL thread. However + while one of the threads is running, they are in use and cannot be + removed. + */ + if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running) + { + purge_relay_logs(&mi->rli, NULL, 0, &errmsg); + mi->master_log_name[0]= 0; + mi->master_log_pos= 0; + } + if (thread_mask & SLAVE_IO) error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE @@ -1421,7 +1441,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) (master_res= mysql_store_result(mysql)) && (master_row= mysql_fetch_row(master_res))) { - if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) && + if ((global_system_variables.server_id == + (mi->master_id= strtoul(master_row[1], 0, 10))) && !mi->rli.replicate_same_server_id) { errmsg= "The slave I/O thread stops because master and slave have equal \ @@ -1801,6 +1822,133 @@ past_checksum: after_set_capability: #endif + /* + Request dump start from slave replication GTID state. + + Only request GTID position the first time we connect after CHANGE MASTER + or after starting both IO or SQL thread. + + Otherwise, if the IO thread was ahead of the SQL thread before the + restart or reconnect, we might end up re-fetching and hence re-applying + the same event(s) again. + */ + if (mi->using_gtid && !mi->master_log_name[0]) + { + int rc; + char str_buf[256]; + String connect_state(str_buf, sizeof(str_buf), system_charset_info); + connect_state.length(0); + + /* + Read the master @@GLOBAL.gtid_domain_id variable. + This is mostly to check that master is GTID aware, but we could later + perhaps use it to check that different multi-source masters are correctly + configured with distinct domain_id. + */ + if (mysql_real_query(mysql, + STRING_WITH_LEN("SELECT @@GLOBAL.gtid_domain_id")) || + !(master_res= mysql_store_result(mysql)) || + !(master_row= mysql_fetch_row(master_res))) + { + err_code= mysql_errno(mysql); + errmsg= "The slave I/O thread stops because master does not support " + "MariaDB global transaction id. A fatal error is encountered when " + "it tries to SELECT @@GLOBAL.gtid_domain_id."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + mysql_free_result(master_res); + master_res= NULL; + + connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + if (rpl_append_gtid_state(&connect_state, true)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_connect_state."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + connect_state.append(STRING_WITH_LEN("'"), system_charset_info); + + rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_connect_state failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_connect_state."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } + if (!mi->using_gtid) + { + /* + If we are not using GTID to connect this time, then instead request + the corresponding GTID position from the master, so that the user + can reconnect the next time using MASTER_GTID_POS=AUTO. + */ + char quote_buf[2*sizeof(mi->master_log_name)+1]; + char str_buf[28+2*sizeof(mi->master_log_name)+10]; + String query(str_buf, sizeof(str_buf), system_charset_info); + query.length(0); + + query.append("SELECT binlog_gtid_pos('"); + escape_quotes_for_mysql(&my_charset_bin, quote_buf, sizeof(quote_buf), + mi->master_log_name, strlen(mi->master_log_name)); + query.append(quote_buf); + query.append("',"); + query.append_ulonglong(mi->master_log_pos); + query.append(")"); + + if (!mysql_real_query(mysql, query.c_ptr_safe(), query.length()) && + (master_res= mysql_store_result(mysql)) && + (master_row= mysql_fetch_row(master_res)) && + (master_row[0] != NULL)) + { + rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0], + strlen(master_row[0]), false); + } + else if (check_io_slave_killed(mi->io_thd, mi, NULL)) + goto slave_killed_err; + else if (is_network_error(mysql_errno(mysql))) + { + mi->report(WARNING_LEVEL, mysql_errno(mysql), + "Get master GTID position failed with error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + /* + ToDo: If the master does not have the binlog_gtid_pos() function, it + just means that it is an old master with no GTID support, so we should + do nothing. + + However, if binlog_gtid_pos() exists, but fails or returns NULL, then + it means that the requested position is not valid. We could use this + to catch attempts to replicate from within the middle of an event, + avoiding strange failures or possible corruption. + */ + } + if (master_res) + { + mysql_free_result(master_res); + master_res= NULL; + } + } + err: if (errmsg) { @@ -1994,7 +2142,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, DBUG_RETURN(0); } - int4store(pos, server_id); pos+= 4; + int4store(pos, global_system_variables.server_id); pos+= 4; pos= net_store_data(pos, (uchar*) report_host, report_host_len); pos= net_store_data(pos, (uchar*) report_user, report_user_len); pos= net_store_data(pos, (uchar*) report_password, report_password_len); @@ -2043,16 +2191,20 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, bool show_master_info(THD *thd, Master_info *mi, bool full) { DBUG_ENTER("show_master_info"); + String gtid_pos; - if (send_show_master_info_header(thd, full)) + if (full && rpl_global_gtid_slave_state.tostring(>id_pos, NULL, 0)) + DBUG_RETURN(TRUE); + if (send_show_master_info_header(thd, full, gtid_pos.length())) DBUG_RETURN(TRUE); - if (send_show_master_info_data(thd, mi, full)) + if (send_show_master_info_data(thd, mi, full, >id_pos)) DBUG_RETURN(TRUE); my_eof(thd); DBUG_RETURN(FALSE); } -static bool send_show_master_info_header(THD *thd, bool full) +static bool send_show_master_info_header(THD *thd, bool full, + size_t gtid_pos_length) { List<Item> field_list; Protocol *protocol= thd->protocol; @@ -2135,6 +2287,8 @@ static bool send_show_master_info_header(THD *thd, bool full) sizeof(mi->ssl_crl))); field_list.push_back(new Item_empty_string("Master_SSL_Crlpath", sizeof(mi->ssl_crlpath))); + field_list.push_back(new Item_return_int("Using_Gtid", sizeof(ulong), + MYSQL_TYPE_LONG)); if (full) { field_list.push_back(new Item_return_int("Retried_transactions", @@ -2147,6 +2301,7 @@ static bool send_show_master_info_header(THD *thd, bool full) 10, MYSQL_TYPE_LONG)); field_list.push_back(new Item_float("Slave_heartbeat_period", 0.0, 3, 10)); + field_list.push_back(new Item_empty_string("Gtid_Pos", gtid_pos_length)); } if (protocol->send_result_set_metadata(&field_list, @@ -2156,7 +2311,8 @@ static bool send_show_master_info_header(THD *thd, bool full) } -static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full) +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, + String *gtid_pos) { DBUG_ENTER("send_show_master_info_data"); @@ -2315,6 +2471,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full) protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath protocol->store(mi->ssl_capath, &my_charset_bin); + protocol->store((uint32) (mi->using_gtid != 0)); if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -2322,6 +2479,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full) protocol->store((uint32) mi->rli.executed_entries); protocol->store((uint32) mi->received_heartbeats); protocol->store((double) mi->heartbeat_period, 3, &tmp); + protocol->store(gtid_pos->ptr(), gtid_pos->length(), &my_charset_bin); } mysql_mutex_unlock(&mi->rli.err_lock); @@ -2364,11 +2522,19 @@ static int cmp_mi_by_name(const Master_info **arg1, bool show_all_master_info(THD* thd) { uint i, elements; + String gtid_pos; Master_info **tmp; DBUG_ENTER("show_master_info"); mysql_mutex_assert_owner(&LOCK_active_mi); - if (send_show_master_info_header(thd, 1)) + gtid_pos.length(0); + if (rpl_append_gtid_state(>id_pos, true)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + DBUG_RETURN(TRUE); + } + + if (send_show_master_info_header(thd, 1, gtid_pos.length())) DBUG_RETURN(TRUE); if (!(elements= master_info_index->master_info_hash.records)) @@ -2390,7 +2556,7 @@ bool show_all_master_info(THD* thd) for (i= 0; i < elements; i++) { - if (send_show_master_info_data(thd, tmp[i], 1)) + if (send_show_master_info_data(thd, tmp[i], 1, >id_pos)) DBUG_RETURN(TRUE); } @@ -2555,7 +2721,7 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); - int4store(buf + 6, server_id); + int4store(buf + 6, global_system_variables.server_id); len = (uint) strlen(logname); memcpy(buf + 10, logname,len); if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) @@ -2764,7 +2930,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) has a Rotate etc). */ - thd->server_id = ev->server_id; // use the original server id for logging + /* Use the original server id for logging. */ + thd->variables.server_id = ev->server_id; thd->set_time(); // time the query thd->lex->current_select= 0; if (!ev->when) @@ -3548,17 +3715,34 @@ err_during_init: /* Check the temporary directory used by commands like LOAD DATA INFILE. + + As the directory never changes during a mysqld run, we only + test this once and cache the result. This also resolve a race condition + when this can be run by multiple threads at the same time. */ + +static bool check_temp_dir_run= 0; +static int check_temp_dir_result= 0; + static int check_temp_dir(char* tmp_file) { - int fd; + File fd; + int result= 1; // Assume failure MY_DIR *dirp; char tmp_dir[FN_REFLEN]; size_t tmp_dir_size; DBUG_ENTER("check_temp_dir"); + mysql_mutex_lock(&LOCK_thread_count); + if (check_temp_dir_run) + { + result= check_temp_dir_result; + goto end; + } + check_temp_dir_run= 1; + /* Get the directory from the temporary file. */ @@ -3568,27 +3752,33 @@ int check_temp_dir(char* tmp_file) Check if the directory exists. */ if (!(dirp=my_dir(tmp_dir,MYF(MY_WME)))) - DBUG_RETURN(1); + goto end; my_dirend(dirp); /* - Check permissions to create a file. + Check permissions to create a file. We use O_TRUNC to ensure that + things works even if we happen to have and old file laying around. */ if ((fd= mysql_file_create(key_file_misc, tmp_file, CREATE_MODE, - O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, + O_WRONLY | O_BINARY | O_TRUNC | O_NOFOLLOW, MYF(MY_WME))) < 0) - DBUG_RETURN(1); + goto end; + result= 0; // Directory name ok /* Clean up. */ mysql_file_close(fd, MYF(0)); mysql_file_delete(key_file_misc, tmp_file, MYF(0)); - DBUG_RETURN(0); +end: + check_temp_dir_result= result; + mysql_mutex_unlock(&LOCK_thread_count); + DBUG_RETURN(result); } + /** Slave SQL thread entry point. @@ -3750,6 +3940,15 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, goto err; } + /* Load the set of seen GTIDs, if we did not already. */ + if (rpl_load_gtid_slave_state(thd)) + { + rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), + "Unable to load replication GTID slave state from mysql.%s: %s", + rpl_gtid_slave_state_table_name.str, thd->stmt_da->message()); + goto err; + } + /* execute init_slave variable */ if (opt_init_slave.length) { @@ -3975,7 +4174,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) } DBUG_ASSERT(cev->inited_from_old); thd->file_id = cev->file_id = mi->file_id++; - thd->server_id = cev->server_id; + thd->variables.server_id = cev->server_id; cev_not_written = 1; if (unlikely(net_request_file(net,cev->fname))) @@ -4587,16 +4786,18 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) Heartbeat is sent only after an event corresponding to the corrdinates the heartbeat carries. - Slave can not have a difference in coordinates except in the only + Slave can not have a higher coordinate except in the only special case when mi->master_log_name, master_log_pos have never been updated by Rotate event i.e when slave does not have any history with the master (and thereafter mi->master_log_pos is NULL). + Slave can have lower coordinates, if some event from master was omitted. + TODO: handling `when' for SHOW SLAVE STATUS' snds behind */ if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len()) && mi->master_log_name != NULL) - || mi->master_log_pos != hb.log_pos) + || mi->master_log_pos > hb.log_pos) { /* missed events of heartbeat from the past */ error= ER_SLAVE_HEARTBEAT_FAILURE; @@ -4648,7 +4849,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_lock(log_lock); s_id= uint4korr(buf + SERVER_ID_OFFSET); - if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) || + if ((s_id == global_system_variables.server_id && + !mi->rli.replicate_same_server_id) || /* the following conjunction deals with IGNORE_SERVER_IDS, if set If the master is on the ignore list, execution of @@ -4679,7 +4881,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) IGNORE_SERVER_IDS it increments mi->master_log_pos as well as rli->group_relay_log_pos. */ - if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) || + if (!(s_id == global_system_variables.server_id && + !mi->rli.replicate_same_server_id) || (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) @@ -5217,6 +5420,27 @@ static Log_event* next_event(Relay_log_info* rli) inc_event_relay_log_pos() */ rli->future_event_relay_log_pos= my_b_tell(cur_log); + /* + For GTID, allocate a new sub_id for the given domain_id. + The sub_id must be allocated in increasing order of binlog order. + */ + if (ev->get_type_code() == GTID_EVENT) + { + Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev); + uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id); + if (!sub_id) + { + errmsg = "slave SQL thread aborted because of out-of-memory error"; + if (hot_log) + mysql_mutex_unlock(log_lock); + goto err; + } + rli->gtid_sub_id= sub_id; + rli->current_gtid.server_id= gev->server_id; + rli->current_gtid.domain_id= gev->domain_id; + rli->current_gtid.seq_no= gev->seq_no; + } + if (hot_log) mysql_mutex_unlock(log_lock); DBUG_RETURN(ev); |