diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 348 |
1 files changed, 224 insertions, 124 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 2f547707ed5..a0f952955d5 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -31,6 +31,7 @@ #include "debug_sync.h" #include "semisync_master.h" #include "semisync_slave.h" +#include "mysys_err.h" enum enum_gtid_until_state { GTID_UNTIL_NOT_DONE, @@ -517,6 +518,22 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * DBUG_RETURN(ret); } + +/** + Set current_linfo + + Setting current_linfo needs to be done with LOCK_thd_data to ensure that + adjust_linfo_offsets doesn't use a structure that may be deleted. +*/ + +void THD::set_current_linfo(LOG_INFO *linfo) +{ + mysql_mutex_lock(&LOCK_thd_data); + current_linfo= linfo; + mysql_mutex_unlock(&LOCK_thd_data); +} + + /* Adjust the position pointer in the binary log file for all running slaves @@ -538,61 +555,48 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * Now they sync is done for next read. */ -void adjust_linfo_offsets(my_off_t purge_offset) +static my_bool adjust_callback(THD *thd, my_off_t *purge_offset) { - THD *tmp; - - mysql_mutex_lock(&LOCK_thread_count); - I_List_iterator<THD> it(threads); - - while ((tmp=it++)) + mysql_mutex_lock(&thd->LOCK_thd_data); + if (auto linfo= thd->current_linfo) { - LOG_INFO* linfo; - if ((linfo = tmp->current_linfo)) - { - mysql_mutex_lock(&linfo->lock); - /* - Index file offset can be less that purge offset only if - we just started reading the index file. In that case - we have nothing to adjust - */ - if (linfo->index_file_offset < purge_offset) - linfo->fatal = (linfo->index_file_offset != 0); - else - linfo->index_file_offset -= purge_offset; - mysql_mutex_unlock(&linfo->lock); - } + /* + Index file offset can be less that purge offset only if + we just started reading the index file. In that case + we have nothing to adjust + */ + if (linfo->index_file_offset < *purge_offset) + linfo->fatal= (linfo->index_file_offset != 0); + else + linfo->index_file_offset-= *purge_offset; } - mysql_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&thd->LOCK_thd_data); + return 0; } -bool log_in_use(const char* log_name) +void adjust_linfo_offsets(my_off_t purge_offset) { - size_t log_name_len = strlen(log_name) + 1; - THD *tmp; - bool result = 0; - - mysql_mutex_lock(&LOCK_thread_count); - I_List_iterator<THD> it(threads); + server_threads.iterate(adjust_callback, &purge_offset); +} - while ((tmp=it++)) - { - LOG_INFO* linfo; - if ((linfo = tmp->current_linfo)) - { - mysql_mutex_lock(&linfo->lock); - result = !strncmp(log_name, linfo->log_file_name, log_name_len); - mysql_mutex_unlock(&linfo->lock); - if (result) - break; - } - } - mysql_mutex_unlock(&LOCK_thread_count); +static my_bool log_in_use_callback(THD *thd, const char *log_name) +{ + my_bool result= 0; + mysql_mutex_lock(&thd->LOCK_thd_data); + if (auto linfo= thd->current_linfo) + result= !strcmp(log_name, linfo->log_file_name); + mysql_mutex_unlock(&thd->LOCK_thd_data); return result; } + +bool log_in_use(const char* log_name) +{ + return server_threads.iterate(log_in_use_callback, log_name); +} + bool purge_error_message(THD* thd, int res) { uint errcode; @@ -874,44 +878,88 @@ static int send_heartbeat_event(binlog_send_info *info, struct binlog_file_entry { binlog_file_entry *next; - char *name; + LEX_CSTRING name; + my_off_t size; }; +/** + Read all binary logs and return as a list + + @param memroot Use this for mem_root calls + @param reverse If set filenames returned in latest first order (reverse + order than in the index file) + @param already_locked If set, index file is already locked. + + @return 0 error + # pointer to list + + @notes + index_file is always unlocked at return +*/ + static binlog_file_entry * -get_binlog_list(MEM_ROOT *memroot) +get_binlog_list(MEM_ROOT *memroot, bool reverse= true, + bool already_locked= false) { IO_CACHE *index_file; - char fname[FN_REFLEN]; - size_t length; - binlog_file_entry *current_list= NULL, *e; + char *fname, *buff, *end_pos; + binlog_file_entry *current_list= NULL, *current_link= NULL, *e; DBUG_ENTER("get_binlog_list"); if (!mysql_bin_log.is_open()) { + if (already_locked) + mysql_bin_log.unlock_index(); my_error(ER_NO_BINARY_LOGGING, MYF(0)); DBUG_RETURN(NULL); } - - mysql_bin_log.lock_index(); + if (!already_locked) + mysql_bin_log.lock_index(); index_file=mysql_bin_log.get_index_file(); reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); + if (!(buff= (char*) alloc_root(memroot, + (size_t) (index_file->end_of_file+1)))) + goto err; + if (my_b_read(index_file, (uchar*) buff, (size_t) index_file->end_of_file)) + { + my_error(EE_READ, MYF(ME_ERROR_LOG), my_filename(index_file->file), + my_errno); + goto err; + } + buff[index_file->end_of_file]= 0; // For strchr + mysql_bin_log.unlock_index(); + /* The file ends with EOF or empty line */ - while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) + for (fname= buff; + (end_pos= strchr(fname, '\n')) && (end_pos - fname) > 1; + fname= end_pos+1) { - --length; /* Remove the newline */ - if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) || - !(e->name= strmake_root(memroot, fname, length))) - { - mysql_bin_log.unlock_index(); + end_pos[0]= '\0'; // remove the newline + if (!(e= (binlog_file_entry *) alloc_root(memroot, sizeof(*e)))) DBUG_RETURN(NULL); + if (reverse) + { + e->next= current_list; + current_list= e; + } + else + { + e->next= NULL; + if (!current_link) + current_list= e; + else + current_link->next= e; + current_link= e; } - e->next= current_list; - current_list= e; + e->name.str= fname; + e->name.length= (size_t) (end_pos - fname); } - mysql_bin_log.unlock_index(); - DBUG_RETURN(current_list); + +err: + mysql_bin_log.unlock_index(); + DBUG_RETURN(0); } @@ -1236,8 +1284,7 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name, char buf[FN_REFLEN]; init_alloc_root(&memroot, "gtid_find_binlog_file", - 10*(FN_REFLEN+sizeof(binlog_file_entry)), - 0, MYF(MY_THREAD_SPECIFIC)); + 8192, 0, MYF(MY_THREAD_SPECIFIC)); if (!(list= get_binlog_list(&memroot))) { errormsg= "Out of memory while looking for GTID position in binlog"; @@ -1263,7 +1310,7 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name, Read the Gtid_list_log_event at the start of the binlog file to get the binlog state. */ - if (normalize_binlog_name(buf, list->name, false)) + if (normalize_binlog_name(buf, list->name.str, false)) { errormsg= "Failed to determine binlog file name while looking for " "GTID position in binlog"; @@ -1958,7 +2005,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, pos= my_b_tell(log); if (repl_semisync_master.update_sync_header(info->thd, - (uchar*) packet->c_ptr(), + (uchar*) packet->ptr(), info->log_file_name + info->dirlen, pos, &need_sync)) { @@ -2149,9 +2196,8 @@ static int init_binlog_sender(binlog_send_info *info, // set current pos too linfo->pos= *pos; - // note: publish that we use file, before we open it - thd->current_linfo= linfo; + thd->set_current_linfo(linfo); if (check_start_offset(info, linfo->log_file_name, *pos)) return 1; @@ -2403,14 +2449,15 @@ static int send_format_descriptor_event(binlog_send_info *info, IO_CACHE *log, DBUG_RETURN(0); } -static bool should_stop(binlog_send_info *info) +static bool should_stop(binlog_send_info *info, bool kill_server_check= false) { return - info->net->error || - info->net->vio == NULL || - info->thd->killed || - info->error != 0 || - info->should_stop; + info->net->error || + info->net->vio == NULL || + (info->thd->killed && + (info->thd->killed != KILL_SERVER || kill_server_check)) || + info->error != 0 || + info->should_stop; } /** @@ -2431,7 +2478,7 @@ static int wait_new_events(binlog_send_info *info, /* in */ &stage_master_has_sent_all_binlog_to_slave, &old_stage); - while (!should_stop(info)) + while (!should_stop(info, true)) { *end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename); if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0) @@ -2783,6 +2830,14 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, info->error= ER_UNKNOWN_ERROR; goto err; } + DBUG_EXECUTE_IF("simulate_delay_at_shutdown", + { + const char act[]= + "now " + "WAIT_FOR greetings_from_kill_mysql"; + DBUG_ASSERT(!debug_sync_set_action(thd, + STRING_WITH_LEN(act))); + };); /* heartbeat_period from @master_heartbeat_period user variable @@ -3394,31 +3449,42 @@ err: slave_server_id the slave's server id */ -void kill_zombie_dump_threads(uint32 slave_server_id) +struct kill_callback_arg { - mysql_mutex_lock(&LOCK_thread_count); - I_List_iterator<THD> it(threads); - THD *tmp; + kill_callback_arg(uint32 id): slave_server_id(id), thd(0) {} + uint32 slave_server_id; + THD *thd; +}; - while ((tmp=it++)) +static my_bool kill_callback(THD *thd, kill_callback_arg *arg) +{ + if (thd->get_command() == COM_BINLOG_DUMP && + thd->variables.server_id == arg->slave_server_id) { - if (tmp->get_command() == COM_BINLOG_DUMP && - tmp->variables.server_id == slave_server_id) - { - mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete - break; - } + arg->thd= thd; + if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete + return 1; } - mysql_mutex_unlock(&LOCK_thread_count); - if (tmp) + return 0; +} + + +void kill_zombie_dump_threads(uint32 slave_server_id) +{ + kill_callback_arg arg(slave_server_id); + server_threads.iterate(kill_callback, &arg); + + if (arg.thd) { /* Here we do not call kill_one_thread() as it will be slow because it will iterate through the list again. We just to do kill the thread ourselves. */ - tmp->awake_no_mutex(KILL_SLAVE_SAME_ID); - mysql_mutex_unlock(&tmp->LOCK_thd_kill); + arg.thd->awake_no_mutex(KILL_SLAVE_SAME_ID); + mysql_mutex_unlock(&arg.thd->LOCK_thd_kill); + if (WSREP(arg.thd)) mysql_mutex_unlock(&arg.thd->LOCK_thd_data); } } @@ -3866,11 +3932,21 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len, if (!mysql_bin_log.is_open()) { my_message(ER_FLUSH_MASTER_BINLOG_CLOSED, - ER_THD(thd, ER_FLUSH_MASTER_BINLOG_CLOSED), - MYF(ME_BELL+ME_WAITTANG)); + ER_THD(thd, ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(0)); return 1; } +#ifdef WITH_WSREP + if (WSREP_ON) + { + /* RESET MASTER will initialize GTID sequence, and that would happen locally + in this node, so better reject it + */ + my_message(ER_NOT_ALLOWED_COMMAND, + "RESET MASTER not allowed when node is in cluster", MYF(0)); + return 1; + } +#endif /* WITH_WSREP */ bool ret= 0; /* Temporarily disable master semisync before reseting master. */ repl_semisync_master.before_reset_master(); @@ -3969,7 +4045,7 @@ bool mysql_show_binlog_events(THD* thd) goto err; } - thd->current_linfo= &linfo; + thd->set_current_linfo(&linfo); if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0) goto err; @@ -4186,17 +4262,25 @@ void show_binlogs_get_fields(THD *thd, List<Item> *field_list) @retval FALSE success @retval TRUE failure + + @notes + We only keep the index locked while reading all file names as + if there are 1000+ binary logs, there can be a serious impact + as getting the file sizes can take some notable time (up to 20 seconds + has been reported) and we don't want to block log rotations for that long. */ + +#define BINLOG_INDEX_RETRY_COUNT 5 + bool show_binlogs(THD* thd) { - IO_CACHE *index_file; LOG_INFO cur; - File file; - char fname[FN_REFLEN]; + MEM_ROOT mem_root; + binlog_file_entry *list; List<Item> field_list; - size_t length; - size_t cur_dir_len; Protocol *protocol= thd->protocol; + uint retry_count= 0; + size_t cur_dir_len; DBUG_ENTER("show_binlogs"); if (!mysql_bin_log.is_open()) @@ -4210,55 +4294,71 @@ bool show_binlogs(THD* thd) if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); - + + init_alloc_root(&mem_root, "binlog_file_list", 8192, 0, + MYF(MY_THREAD_SPECIFIC)); +retry: + /* + The current mutex handling here is to ensure we get the current log position + and all the log files from the index in sync without any index rotation + in between. + */ mysql_mutex_lock(mysql_bin_log.get_log_lock()); mysql_bin_log.lock_index(); - index_file=mysql_bin_log.get_index_file(); + mysql_bin_log.raw_get_current_log(&cur); + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); - mysql_bin_log.raw_get_current_log(&cur); // dont take mutex - mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK - - cur_dir_len= dirname_length(cur.log_file_name); + /* The following call unlocks lock_index */ + if ((!(list= get_binlog_list(&mem_root, false, true)))) + goto err; - reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); + DEBUG_SYNC(thd, "at_after_lock_index"); - /* The file ends with EOF or empty line */ - while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) + // the 1st loop computes the sizes; If stat() fails, then retry + cur_dir_len= dirname_length(cur.log_file_name); + for (binlog_file_entry *cur_link= list; cur_link; cur_link= cur_link->next) { - size_t dir_len; - ulonglong file_length= 0; // Length if open fails - fname[--length] = '\0'; // remove the newline + const char *fname= cur_link->name.str; + size_t dir_len= dirname_length(fname); + size_t length= cur_link->name.length- dir_len; - protocol->prepare_for_resend(); - dir_len= dirname_length(fname); - length-= dir_len; - protocol->store(fname + dir_len, length, &my_charset_bin); + /* Skip directory name as we shouldn't include this in the result */ + cur_link->name.str+= dir_len; + cur_link->name.length-= dir_len; if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length))) - file_length= cur.pos; /* The active log, use the active position */ + cur_link->size= cur.pos; /* The active log, use the active position */ else { - /* this is an old log, open it and find the size */ - if ((file= mysql_file_open(key_file_binlog, - fname, O_RDONLY | O_SHARE | O_BINARY, - MYF(0))) >= 0) + MY_STAT stat_info; + if (mysql_file_stat(key_file_binlog, fname, &stat_info, MYF(0))) + cur_link->size= stat_info.st_size; + else { - file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0)); - mysql_file_close(file, MYF(0)); + if (retry_count++ < BINLOG_INDEX_RETRY_COUNT) + { + free_root(&mem_root, MYF(MY_MARK_BLOCKS_FREE)); + goto retry; + } + cur_link->size= 0; } } - protocol->store(file_length); + } + + for (binlog_file_entry *cur_link= list; cur_link; cur_link= cur_link->next) + { + protocol->prepare_for_resend(); + protocol->store(cur_link->name.str, cur_link->name.length, &my_charset_bin); + protocol->store((ulonglong) cur_link->size); if (protocol->write()) goto err; } - if (unlikely(index_file->error == -1)) - goto err; - mysql_bin_log.unlock_index(); + free_root(&mem_root, MYF(0)); my_eof(thd); DBUG_RETURN(FALSE); err: - mysql_bin_log.unlock_index(); + free_root(&mem_root, MYF(0)); DBUG_RETURN(TRUE); } |