summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc236
1 files changed, 133 insertions, 103 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 3e652a4d5a6..9cc596f9bb5 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -180,6 +180,27 @@ err:
}
+/*
+ Adjust the position pointer in the binary log file for all running slaves
+
+ SYNOPSIS
+ adjust_linfo_offsets()
+ purge_offset Number of bytes removed from start of log index file
+
+ NOTES
+ - This is called when doing a PURGE when we delete lines from the
+ index log file
+
+ REQUIREMENTS
+ - Before calling this function, we have to ensure that no threads are
+ using any binary log file before purge_offset.a
+
+ TODO
+ - Inform the slave threads that they should sync the position
+ in the binary log file with flush_relay_log_info.
+ Now they sync is done for next read.
+*/
+
void adjust_linfo_offsets(my_off_t purge_offset)
{
THD *tmp;
@@ -193,9 +214,10 @@ void adjust_linfo_offsets(my_off_t purge_offset)
if ((linfo = tmp->current_linfo))
{
pthread_mutex_lock(&linfo->lock);
- /* index file offset can be less that purge offset
- only if we just started reading the index file. In that case
- we have nothing to adjust
+ /*
+ Index file offset can be less that purge offset only if
+ we just started reading the index file. In that case
+ we have nothing to adjust
*/
if (linfo->index_file_offset < purge_offset)
linfo->fatal = (linfo->index_file_offset != 0);
@@ -274,7 +296,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
{
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
- char search_file_name[FN_REFLEN];
+ char search_file_name[FN_REFLEN], *name;
IO_CACHE log;
File file = -1;
String* packet = &thd->packet;
@@ -295,7 +317,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
}
#endif
-
if (!mysql_bin_log.is_open())
{
errmsg = "Binary log is not open";
@@ -307,17 +328,18 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
goto err;
}
+ name=search_file_name;
if (log_ident[0])
mysql_bin_log.make_log_name(search_file_name, log_ident);
else
- search_file_name[0] = 0;
+ name=0; // Find first log
linfo.index_file_offset = 0;
thd->current_linfo = &linfo;
- if (mysql_bin_log.find_first_log(&linfo, search_file_name))
+ if (mysql_bin_log.find_log_pos(&linfo, name))
{
- errmsg = "Could not find first log";
+ errmsg = "Could not find first log file name in binary log index file";
goto err;
}
@@ -332,19 +354,19 @@ impossible position";
}
my_b_seek(&log, pos); // Seek will done on next read
- packet->length(0);
- // we need to start a packet with something other than 255
- // to distiquish it from error
- packet->append("\0", 1);
+ /*
+ We need to start a packet with something other than 255
+ to distiquish it from error
+ */
+ packet->set("\0", 1);
// if we are at the start of the log
- if (pos == 4)
+ if (pos == BIN_LOG_HEADER_SIZE)
{
// tell the client log name with a fake rotate_event
if (fake_rotate_event(net, packet, log_file_name, &errmsg))
goto err;
- packet->length(0);
- packet->append("\0", 1);
+ packet->set("\0", 1);
}
while (!net->error && net->vio != 0 && !thd->killed)
@@ -376,20 +398,21 @@ impossible position";
goto err;
}
}
- packet->length(0);
- packet->append("\0",1);
+ packet->set("\0", 1);
}
- // TODO: now that we are logging the offset, check to make sure
- // the recorded offset and the actual match
+ /*
+ TODO: now that we are logging the offset, check to make sure
+ the recorded offset and the actual match
+ */
if (error != LOG_READ_EOF)
{
- switch(error) {
+ switch (error) {
case LOG_READ_BOGUS:
errmsg = "bogus data in log event";
break;
case LOG_READ_TOO_LARGE:
- errmsg = "log event entry exceeded max_allowed_packet -\
- increase max_allowed_packet on master";
+ errmsg = "log event entry exceeded max_allowed_packet; \
+Increase max_allowed_packet on master";
break;
case LOG_READ_IO:
errmsg = "I/O error reading log event";
@@ -410,18 +433,21 @@ impossible position";
if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
mysql_bin_log.is_active(log_file_name))
{
- // block until there is more data in the log
- // unless non-blocking mode requested
+ /*
+ Block until there is more data in the log
+ */
if (net_flush(net))
{
errmsg = "failed on net_flush()";
goto err;
}
- // we may have missed the update broadcast from the log
- // that has just happened, let's try to catch it if it did
- // if we did not miss anything, we just wait for other threads
- // to signal us
+ /*
+ We may have missed the update broadcast from the log
+ that has just happened, let's try to catch it if it did.
+ If we did not miss anything, we just wait for other threads
+ to signal us.
+ */
{
log.error=0;
bool read_packet = 0, fatal_error = 0;
@@ -435,32 +461,32 @@ impossible position";
}
#endif
- // no one will update the log while we are reading
- // now, but we'll be quick and just read one record
+ /*
+ No one will update the log while we are reading
+ now, but we'll be quick and just read one record
+
+ To be able to handle EOF properly, we have to have the
+ pthread_mutex_unlock() statements in the case statements.
+ */
pthread_mutex_lock(log_lock);
- switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0))
- {
+ switch (Log_event::read_log_event(&log, packet, (pthread_mutex_t*)0)) {
case 0:
- pthread_mutex_unlock(log_lock);
+ /* we read successfully, so we'll need to send it to the slave */
read_packet = 1;
- // we read successfully, so we'll need to send it to the
- // slave
break;
+
case LOG_READ_EOF:
DBUG_PRINT("wait",("waiting for data in binary log"));
- // wait_for_update unlocks the log lock - needed to avoid race
if (!thd->killed)
mysql_bin_log.wait_for_update(thd);
- else
- pthread_mutex_unlock(log_lock);
DBUG_PRINT("wait",("binary log received update"));
break;
default:
- pthread_mutex_unlock(log_lock);
fatal_error = 1;
break;
}
+ pthread_mutex_unlock(log_lock);
if (read_packet)
{
@@ -479,10 +505,11 @@ impossible position";
goto err;
}
}
- packet->length(0);
- packet->append("\0",1);
- // no need to net_flush because we will get to flush later when
- // we hit EOF pretty quick
+ packet->set("\0", 1);
+ /*
+ No need to net_flush because we will get to flush later when
+ we hit EOF pretty quick
+ */
}
if (fatal_error)
@@ -539,7 +566,6 @@ impossible position";
err:
thd->proc_info = "waiting to finalize termination";
end_io_cache(&log);
- pthread_mutex_lock(&LOCK_thread_count);
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
@@ -547,6 +573,7 @@ impossible position";
this mutex will make sure that it never tried to update our linfo
after we return from this stack frame
*/
+ pthread_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
pthread_mutex_unlock(&LOCK_thread_count);
if (file >= 0)
@@ -561,16 +588,19 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
if (!thd) thd = current_thd;
NET* net = &thd->net;
int thread_mask;
+ DBUG_ENTER("start_slave");
if (check_access(thd, SUPER_ACL, any_db))
- return 1;
+ DBUG_RETURN(1);
lock_slave_threads(mi); // this allows us to cleanly read slave_running
init_thread_mask(&thread_mask,mi,1 /* inverse */);
if (thd->lex.slave_thd_opt)
thread_mask &= thd->lex.slave_thd_opt;
if (thread_mask)
{
- if (server_id_supplied && (!mi->inited || (mi->inited && *mi->host)))
+ if (init_master_info(mi,master_info_file,relay_log_info_file, 0))
+ slave_errno=ER_MASTER_INFO;
+ else if (server_id_supplied && *mi->host)
slave_errno = start_slave_threads(0 /*no mutex */,
1 /* wait for start */,
mi,
@@ -588,12 +618,12 @@ int start_slave(THD* thd , MASTER_INFO* mi, bool net_report)
{
if (net_report)
send_error(net, slave_errno);
- return 1;
+ DBUG_RETURN(1);
}
else if (net_report)
send_ok(net);
- return 0;
+ DBUG_RETURN(0);
}
int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
@@ -628,7 +658,7 @@ int stop_slave(THD* thd, MASTER_INFO* mi, bool net_report )
return 0;
}
-int reset_slave(MASTER_INFO* mi)
+int reset_slave(THD *thd, MASTER_INFO* mi)
{
MY_STAT stat_area;
char fname[FN_REFLEN];
@@ -639,7 +669,9 @@ int reset_slave(MASTER_INFO* mi)
lock_slave_threads(mi);
init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */);
if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/))
- || (error=purge_relay_logs(&mi->rli,1 /*just reset*/,&errmsg)))
+ || (error=purge_relay_logs(&mi->rli, thd,
+ 1 /* just reset */,
+ &errmsg)))
goto err;
end_master_info(mi);
@@ -713,16 +745,17 @@ int change_master(THD* thd, MASTER_INFO* mi)
thd->proc_info = "changing master";
LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
// TODO: see if needs re-write
- if (init_master_info(mi,master_info_file,relay_log_info_file))
+ if (init_master_info(mi, master_info_file, relay_log_info_file, 0))
{
send_error(&thd->net, 0, "Could not initialize master info");
unlock_slave_threads(mi);
DBUG_RETURN(1);
}
- /* data lock not needed since we have already stopped the running threads,
- and we have the hold on the run locks which will keep all threads that
- could possibly modify the data structures from running
+ /*
+ Data lock not needed since we have already stopped the running threads,
+ and we have the hold on the run locks which will keep all threads that
+ could possibly modify the data structures from running
*/
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
{
@@ -772,7 +805,8 @@ int change_master(THD* thd, MASTER_INFO* mi)
{
mi->rli.skip_log_purge=0;
thd->proc_info="purging old relay logs";
- if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/,
+ if (purge_relay_logs(&mi->rli, thd,
+ 0 /* not only reset, but also reinit */,
&errmsg))
{
net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
@@ -782,12 +816,13 @@ int change_master(THD* thd, MASTER_INFO* mi)
else
{
const char* msg;
- if (init_relay_log_pos(&mi->rli,0/*log already inited*/,
- 0 /*pos already inited*/,
+ /* Relay log is already initialized */
+ if (init_relay_log_pos(&mi->rli,
+ mi->rli.relay_log_name,
+ mi->rli.relay_log_pos,
0 /*no data lock*/,
&msg))
{
- //Sasha: note that I had to change net_printf() to make this work
net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
unlock_slave_threads(mi);
DBUG_RETURN(1);
@@ -857,26 +892,27 @@ int show_binlog_events(THD* thd)
if (mysql_bin_log.is_open())
{
- LOG_INFO linfo;
- char search_file_name[FN_REFLEN];
- LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
+ LEX_MASTER_INFO *lex_mi = &thd->lex.mi;
uint event_count, limit_start, limit_end;
- const char* log_file_name = lex_mi->log_file_name;
- Log_event* ev;
my_off_t pos = lex_mi->pos;
+ char search_file_name[FN_REFLEN], *name;
+ const char *log_file_name = lex_mi->log_file_name;
+ LOG_INFO linfo;
+ Log_event* ev;
limit_start = thd->lex.select->offset_limit;
limit_end = thd->lex.select->select_limit + limit_start;
+ name= search_file_name;
if (log_file_name)
mysql_bin_log.make_log_name(search_file_name, log_file_name);
else
- search_file_name[0] = 0;
+ name=0; // Find first log
linfo.index_file_offset = 0;
thd->current_linfo = &linfo;
- if (mysql_bin_log.find_first_log(&linfo, search_file_name))
+ if (mysql_bin_log.find_log_pos(&linfo, name))
{
errmsg = "Could not find target log";
goto err;
@@ -981,71 +1017,65 @@ int show_binlog_info(THD* thd)
}
+/*
+ Send a lost of all binary logs to client
+
+ SYNOPSIS
+ show_binlogs()
+ thd Thread specific variable
+
+ RETURN VALUES
+ 0 ok
+ 1 error (Error message sent to client)
+*/
+
int show_binlogs(THD* thd)
{
- const char* errmsg = 0;
- File index_file;
+ const char *errmsg;
+ IO_CACHE *index_file;
char fname[FN_REFLEN];
NET* net = &thd->net;
List<Item> field_list;
- String* packet = &thd->packet;
- IO_CACHE io_cache;
+ String *packet = &thd->packet;
uint length;
if (!mysql_bin_log.is_open())
{
- errmsg = "binlog is not open";
- goto err;
+ //TODO: Replace with ER() error message
+ errmsg= "You are not using binary logging";
+ goto err_with_msg;
}
- field_list.push_back(new Item_empty_string("Log_name", 128));
+ field_list.push_back(new Item_empty_string("Log_name", 255));
if (send_fields(thd, field_list, 1))
- {
- sql_print_error("Failed in send_fields");
return 1;
- }
-
mysql_bin_log.lock_index();
- index_file = mysql_bin_log.get_index_file();
- if (index_file < 0)
- {
- errmsg = "Uninitialized index file pointer";
- goto err2;
- }
- if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, 0, 0,
- MYF(MY_WME)))
- {
- errmsg = "Failed on init_io_cache()";
- goto err2;
- }
- while ((length=my_b_gets(&io_cache, fname, sizeof(fname))))
+ index_file=mysql_bin_log.get_index_file();
+
+ reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
+
+ /* The file ends with EOF or empty line */
+ while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
{
- fname[--length]=0;
int dir_len = dirname_length(fname);
packet->length(0);
- net_store_data(packet, fname + dir_len, length-dir_len);
+ /* The -1 is for removing newline from fname */
+ net_store_data(packet, fname + dir_len, length-1-dir_len);
if (my_net_write(net, (char*) packet->ptr(), packet->length()))
- {
- sql_print_error("Failed in my_net_write");
- end_io_cache(&io_cache);
- mysql_bin_log.unlock_index();
- return 1;
- }
+ goto err;
}
-
mysql_bin_log.unlock_index();
- end_io_cache(&io_cache);
send_eof(net);
return 0;
-err2:
- mysql_bin_log.unlock_index();
- end_io_cache(&io_cache);
-err:
+err_with_msg:
send_error(net, 0, errmsg);
+err:
+ mysql_bin_log.unlock_index();
return 1;
}
+
int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;