summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc348
1 files changed, 224 insertions, 124 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 2f547707ed5..a0f952955d5 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -31,6 +31,7 @@
#include "debug_sync.h"
#include "semisync_master.h"
#include "semisync_slave.h"
+#include "mysys_err.h"
enum enum_gtid_until_state {
GTID_UNTIL_NOT_DONE,
@@ -517,6 +518,22 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD *
DBUG_RETURN(ret);
}
+
+/**
+ Set current_linfo
+
+ Setting current_linfo needs to be done with LOCK_thd_data to ensure that
+ adjust_linfo_offsets doesn't use a structure that may be deleted.
+*/
+
+void THD::set_current_linfo(LOG_INFO *linfo)
+{
+ mysql_mutex_lock(&LOCK_thd_data);
+ current_linfo= linfo;
+ mysql_mutex_unlock(&LOCK_thd_data);
+}
+
+
/*
Adjust the position pointer in the binary log file for all running slaves
@@ -538,61 +555,48 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD *
Now they sync is done for next read.
*/
-void adjust_linfo_offsets(my_off_t purge_offset)
+static my_bool adjust_callback(THD *thd, my_off_t *purge_offset)
{
- THD *tmp;
-
- mysql_mutex_lock(&LOCK_thread_count);
- I_List_iterator<THD> it(threads);
-
- while ((tmp=it++))
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (auto linfo= thd->current_linfo)
{
- LOG_INFO* linfo;
- if ((linfo = tmp->current_linfo))
- {
- mysql_mutex_lock(&linfo->lock);
- /*
- Index file offset can be less that purge offset only if
- we just started reading the index file. In that case
- we have nothing to adjust
- */
- if (linfo->index_file_offset < purge_offset)
- linfo->fatal = (linfo->index_file_offset != 0);
- else
- linfo->index_file_offset -= purge_offset;
- mysql_mutex_unlock(&linfo->lock);
- }
+ /*
+ Index file offset can be less that purge offset only if
+ we just started reading the index file. In that case
+ we have nothing to adjust
+ */
+ if (linfo->index_file_offset < *purge_offset)
+ linfo->fatal= (linfo->index_file_offset != 0);
+ else
+ linfo->index_file_offset-= *purge_offset;
}
- mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ return 0;
}
-bool log_in_use(const char* log_name)
+void adjust_linfo_offsets(my_off_t purge_offset)
{
- size_t log_name_len = strlen(log_name) + 1;
- THD *tmp;
- bool result = 0;
-
- mysql_mutex_lock(&LOCK_thread_count);
- I_List_iterator<THD> it(threads);
+ server_threads.iterate(adjust_callback, &purge_offset);
+}
- while ((tmp=it++))
- {
- LOG_INFO* linfo;
- if ((linfo = tmp->current_linfo))
- {
- mysql_mutex_lock(&linfo->lock);
- result = !strncmp(log_name, linfo->log_file_name, log_name_len);
- mysql_mutex_unlock(&linfo->lock);
- if (result)
- break;
- }
- }
- mysql_mutex_unlock(&LOCK_thread_count);
+static my_bool log_in_use_callback(THD *thd, const char *log_name)
+{
+ my_bool result= 0;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (auto linfo= thd->current_linfo)
+ result= !strcmp(log_name, linfo->log_file_name);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
return result;
}
+
+bool log_in_use(const char* log_name)
+{
+ return server_threads.iterate(log_in_use_callback, log_name);
+}
+
bool purge_error_message(THD* thd, int res)
{
uint errcode;
@@ -874,44 +878,88 @@ static int send_heartbeat_event(binlog_send_info *info,
struct binlog_file_entry
{
binlog_file_entry *next;
- char *name;
+ LEX_CSTRING name;
+ my_off_t size;
};
+/**
+ Read all binary logs and return as a list
+
+ @param memroot Use this for mem_root calls
+ @param reverse If set filenames returned in latest first order (reverse
+ order than in the index file)
+ @param already_locked If set, index file is already locked.
+
+ @return 0 error
+ # pointer to list
+
+ @notes
+ index_file is always unlocked at return
+*/
+
static binlog_file_entry *
-get_binlog_list(MEM_ROOT *memroot)
+get_binlog_list(MEM_ROOT *memroot, bool reverse= true,
+ bool already_locked= false)
{
IO_CACHE *index_file;
- char fname[FN_REFLEN];
- size_t length;
- binlog_file_entry *current_list= NULL, *e;
+ char *fname, *buff, *end_pos;
+ binlog_file_entry *current_list= NULL, *current_link= NULL, *e;
DBUG_ENTER("get_binlog_list");
if (!mysql_bin_log.is_open())
{
+ if (already_locked)
+ mysql_bin_log.unlock_index();
my_error(ER_NO_BINARY_LOGGING, MYF(0));
DBUG_RETURN(NULL);
}
-
- mysql_bin_log.lock_index();
+ if (!already_locked)
+ mysql_bin_log.lock_index();
index_file=mysql_bin_log.get_index_file();
reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
+ if (!(buff= (char*) alloc_root(memroot,
+ (size_t) (index_file->end_of_file+1))))
+ goto err;
+ if (my_b_read(index_file, (uchar*) buff, (size_t) index_file->end_of_file))
+ {
+ my_error(EE_READ, MYF(ME_ERROR_LOG), my_filename(index_file->file),
+ my_errno);
+ goto err;
+ }
+ buff[index_file->end_of_file]= 0; // For strchr
+ mysql_bin_log.unlock_index();
+
/* The file ends with EOF or empty line */
- while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
+ for (fname= buff;
+ (end_pos= strchr(fname, '\n')) && (end_pos - fname) > 1;
+ fname= end_pos+1)
{
- --length; /* Remove the newline */
- if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) ||
- !(e->name= strmake_root(memroot, fname, length)))
- {
- mysql_bin_log.unlock_index();
+ end_pos[0]= '\0'; // remove the newline
+ if (!(e= (binlog_file_entry *) alloc_root(memroot, sizeof(*e))))
DBUG_RETURN(NULL);
+ if (reverse)
+ {
+ e->next= current_list;
+ current_list= e;
+ }
+ else
+ {
+ e->next= NULL;
+ if (!current_link)
+ current_list= e;
+ else
+ current_link->next= e;
+ current_link= e;
}
- e->next= current_list;
- current_list= e;
+ e->name.str= fname;
+ e->name.length= (size_t) (end_pos - fname);
}
- mysql_bin_log.unlock_index();
-
DBUG_RETURN(current_list);
+
+err:
+ mysql_bin_log.unlock_index();
+ DBUG_RETURN(0);
}
@@ -1236,8 +1284,7 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name,
char buf[FN_REFLEN];
init_alloc_root(&memroot, "gtid_find_binlog_file",
- 10*(FN_REFLEN+sizeof(binlog_file_entry)),
- 0, MYF(MY_THREAD_SPECIFIC));
+ 8192, 0, MYF(MY_THREAD_SPECIFIC));
if (!(list= get_binlog_list(&memroot)))
{
errormsg= "Out of memory while looking for GTID position in binlog";
@@ -1263,7 +1310,7 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name,
Read the Gtid_list_log_event at the start of the binlog file to
get the binlog state.
*/
- if (normalize_binlog_name(buf, list->name, false))
+ if (normalize_binlog_name(buf, list->name.str, false))
{
errormsg= "Failed to determine binlog file name while looking for "
"GTID position in binlog";
@@ -1958,7 +2005,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
pos= my_b_tell(log);
if (repl_semisync_master.update_sync_header(info->thd,
- (uchar*) packet->c_ptr(),
+ (uchar*) packet->ptr(),
info->log_file_name + info->dirlen,
pos, &need_sync))
{
@@ -2149,9 +2196,8 @@ static int init_binlog_sender(binlog_send_info *info,
// set current pos too
linfo->pos= *pos;
-
// note: publish that we use file, before we open it
- thd->current_linfo= linfo;
+ thd->set_current_linfo(linfo);
if (check_start_offset(info, linfo->log_file_name, *pos))
return 1;
@@ -2403,14 +2449,15 @@ static int send_format_descriptor_event(binlog_send_info *info, IO_CACHE *log,
DBUG_RETURN(0);
}
-static bool should_stop(binlog_send_info *info)
+static bool should_stop(binlog_send_info *info, bool kill_server_check= false)
{
return
- info->net->error ||
- info->net->vio == NULL ||
- info->thd->killed ||
- info->error != 0 ||
- info->should_stop;
+ info->net->error ||
+ info->net->vio == NULL ||
+ (info->thd->killed &&
+ (info->thd->killed != KILL_SERVER || kill_server_check)) ||
+ info->error != 0 ||
+ info->should_stop;
}
/**
@@ -2431,7 +2478,7 @@ static int wait_new_events(binlog_send_info *info, /* in */
&stage_master_has_sent_all_binlog_to_slave,
&old_stage);
- while (!should_stop(info))
+ while (!should_stop(info, true))
{
*end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0)
@@ -2783,6 +2830,14 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
info->error= ER_UNKNOWN_ERROR;
goto err;
}
+ DBUG_EXECUTE_IF("simulate_delay_at_shutdown",
+ {
+ const char act[]=
+ "now "
+ "WAIT_FOR greetings_from_kill_mysql";
+ DBUG_ASSERT(!debug_sync_set_action(thd,
+ STRING_WITH_LEN(act)));
+ };);
/*
heartbeat_period from @master_heartbeat_period user variable
@@ -3394,31 +3449,42 @@ err:
slave_server_id the slave's server id
*/
-void kill_zombie_dump_threads(uint32 slave_server_id)
+struct kill_callback_arg
{
- mysql_mutex_lock(&LOCK_thread_count);
- I_List_iterator<THD> it(threads);
- THD *tmp;
+ kill_callback_arg(uint32 id): slave_server_id(id), thd(0) {}
+ uint32 slave_server_id;
+ THD *thd;
+};
- while ((tmp=it++))
+static my_bool kill_callback(THD *thd, kill_callback_arg *arg)
+{
+ if (thd->get_command() == COM_BINLOG_DUMP &&
+ thd->variables.server_id == arg->slave_server_id)
{
- if (tmp->get_command() == COM_BINLOG_DUMP &&
- tmp->variables.server_id == slave_server_id)
- {
- mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
- break;
- }
+ arg->thd= thd;
+ if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete
+ return 1;
}
- mysql_mutex_unlock(&LOCK_thread_count);
- if (tmp)
+ return 0;
+}
+
+
+void kill_zombie_dump_threads(uint32 slave_server_id)
+{
+ kill_callback_arg arg(slave_server_id);
+ server_threads.iterate(kill_callback, &arg);
+
+ if (arg.thd)
{
/*
Here we do not call kill_one_thread() as
it will be slow because it will iterate through the list
again. We just to do kill the thread ourselves.
*/
- tmp->awake_no_mutex(KILL_SLAVE_SAME_ID);
- mysql_mutex_unlock(&tmp->LOCK_thd_kill);
+ arg.thd->awake_no_mutex(KILL_SLAVE_SAME_ID);
+ mysql_mutex_unlock(&arg.thd->LOCK_thd_kill);
+ if (WSREP(arg.thd)) mysql_mutex_unlock(&arg.thd->LOCK_thd_data);
}
}
@@ -3866,11 +3932,21 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
if (!mysql_bin_log.is_open())
{
my_message(ER_FLUSH_MASTER_BINLOG_CLOSED,
- ER_THD(thd, ER_FLUSH_MASTER_BINLOG_CLOSED),
- MYF(ME_BELL+ME_WAITTANG));
+ ER_THD(thd, ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(0));
return 1;
}
+#ifdef WITH_WSREP
+ if (WSREP_ON)
+ {
+ /* RESET MASTER will initialize GTID sequence, and that would happen locally
+ in this node, so better reject it
+ */
+ my_message(ER_NOT_ALLOWED_COMMAND,
+ "RESET MASTER not allowed when node is in cluster", MYF(0));
+ return 1;
+ }
+#endif /* WITH_WSREP */
bool ret= 0;
/* Temporarily disable master semisync before reseting master. */
repl_semisync_master.before_reset_master();
@@ -3969,7 +4045,7 @@ bool mysql_show_binlog_events(THD* thd)
goto err;
}
- thd->current_linfo= &linfo;
+ thd->set_current_linfo(&linfo);
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
goto err;
@@ -4186,17 +4262,25 @@ void show_binlogs_get_fields(THD *thd, List<Item> *field_list)
@retval FALSE success
@retval TRUE failure
+
+ @notes
+ We only keep the index locked while reading all file names as
+ if there are 1000+ binary logs, there can be a serious impact
+ as getting the file sizes can take some notable time (up to 20 seconds
+ has been reported) and we don't want to block log rotations for that long.
*/
+
+#define BINLOG_INDEX_RETRY_COUNT 5
+
bool show_binlogs(THD* thd)
{
- IO_CACHE *index_file;
LOG_INFO cur;
- File file;
- char fname[FN_REFLEN];
+ MEM_ROOT mem_root;
+ binlog_file_entry *list;
List<Item> field_list;
- size_t length;
- size_t cur_dir_len;
Protocol *protocol= thd->protocol;
+ uint retry_count= 0;
+ size_t cur_dir_len;
DBUG_ENTER("show_binlogs");
if (!mysql_bin_log.is_open())
@@ -4210,55 +4294,71 @@ bool show_binlogs(THD* thd)
if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
-
+
+ init_alloc_root(&mem_root, "binlog_file_list", 8192, 0,
+ MYF(MY_THREAD_SPECIFIC));
+retry:
+ /*
+ The current mutex handling here is to ensure we get the current log position
+ and all the log files from the index in sync without any index rotation
+ in between.
+ */
mysql_mutex_lock(mysql_bin_log.get_log_lock());
mysql_bin_log.lock_index();
- index_file=mysql_bin_log.get_index_file();
+ mysql_bin_log.raw_get_current_log(&cur);
+ mysql_mutex_unlock(mysql_bin_log.get_log_lock());
- mysql_bin_log.raw_get_current_log(&cur); // dont take mutex
- mysql_mutex_unlock(mysql_bin_log.get_log_lock()); // lockdep, OK
-
- cur_dir_len= dirname_length(cur.log_file_name);
+ /* The following call unlocks lock_index */
+ if ((!(list= get_binlog_list(&mem_root, false, true))))
+ goto err;
- reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
+ DEBUG_SYNC(thd, "at_after_lock_index");
- /* The file ends with EOF or empty line */
- while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
+ // the 1st loop computes the sizes; If stat() fails, then retry
+ cur_dir_len= dirname_length(cur.log_file_name);
+ for (binlog_file_entry *cur_link= list; cur_link; cur_link= cur_link->next)
{
- size_t dir_len;
- ulonglong file_length= 0; // Length if open fails
- fname[--length] = '\0'; // remove the newline
+ const char *fname= cur_link->name.str;
+ size_t dir_len= dirname_length(fname);
+ size_t length= cur_link->name.length- dir_len;
- protocol->prepare_for_resend();
- dir_len= dirname_length(fname);
- length-= dir_len;
- protocol->store(fname + dir_len, length, &my_charset_bin);
+ /* Skip directory name as we shouldn't include this in the result */
+ cur_link->name.str+= dir_len;
+ cur_link->name.length-= dir_len;
if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
- file_length= cur.pos; /* The active log, use the active position */
+ cur_link->size= cur.pos; /* The active log, use the active position */
else
{
- /* this is an old log, open it and find the size */
- if ((file= mysql_file_open(key_file_binlog,
- fname, O_RDONLY | O_SHARE | O_BINARY,
- MYF(0))) >= 0)
+ MY_STAT stat_info;
+ if (mysql_file_stat(key_file_binlog, fname, &stat_info, MYF(0)))
+ cur_link->size= stat_info.st_size;
+ else
{
- file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
- mysql_file_close(file, MYF(0));
+ if (retry_count++ < BINLOG_INDEX_RETRY_COUNT)
+ {
+ free_root(&mem_root, MYF(MY_MARK_BLOCKS_FREE));
+ goto retry;
+ }
+ cur_link->size= 0;
}
}
- protocol->store(file_length);
+ }
+
+ for (binlog_file_entry *cur_link= list; cur_link; cur_link= cur_link->next)
+ {
+ protocol->prepare_for_resend();
+ protocol->store(cur_link->name.str, cur_link->name.length, &my_charset_bin);
+ protocol->store((ulonglong) cur_link->size);
if (protocol->write())
goto err;
}
- if (unlikely(index_file->error == -1))
- goto err;
- mysql_bin_log.unlock_index();
+ free_root(&mem_root, MYF(0));
my_eof(thd);
DBUG_RETURN(FALSE);
err:
- mysql_bin_log.unlock_index();
+ free_root(&mem_root, MYF(0));
DBUG_RETURN(TRUE);
}