summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <monty@hundin.mysql.fi>2002-06-05 23:04:38 +0300
committerunknown <monty@hundin.mysql.fi>2002-06-05 23:04:38 +0300
commit03728196ee76cfb4bce8923ec25687f8bbd495cb (patch)
tree9260753468997c0d4b3b8c78aea517507e2791eb
parentef06010563093f231d62896c6af9e128142fbd56 (diff)
downloadmariadb-git-03728196ee76cfb4bce8923ec25687f8bbd495cb.tar.gz
removed init_count from IO_CACHE.
Added missing mutex_unlock to slave replication code. include/my_sys.h: removed init_count from IO_CACHE. General cleanup. innobase/srv/srv0srv.c: Initailize slots to avoid purify warnings. Removed some compiler warnings. mysql-test/mysql-test-run.sh: Automatic start of slave under gdb mysys/mf_iocache.c: removed init_count sql/field.cc: Cleanup sql/log.cc: Cleanup added open_count variable. sql/log_event.cc: cleanup use is_prefix instead of memcmp() sql/repl_failsafe.cc: cleanup sql/slave.cc: cleanup use MYSQL_LOG->open_count instead of IO_CACHE->init_count Added missing mutex_unlock() sql/slave.h: cleanup sql/sql_class.h: cleanup Added open_count to MYSQL_LOGL sql/sql_parse.cc: removed compiler warning sql/sql_repl.cc: added DBUG_xxx sql/unireg.h: Added BIN_LOG_HEADER_SIZE
-rw-r--r--include/my_sys.h133
-rw-r--r--innobase/srv/srv0srv.c14
-rw-r--r--mysql-test/mysql-test-run.sh20
-rw-r--r--mysys/mf_iocache.c2
-rw-r--r--sql/field.cc3
-rw-r--r--sql/log.cc77
-rw-r--r--sql/log_event.cc247
-rw-r--r--sql/repl_failsafe.cc30
-rw-r--r--sql/slave.cc673
-rw-r--r--sql/slave.h135
-rw-r--r--sql/sql_class.h67
-rw-r--r--sql/sql_parse.cc1
-rw-r--r--sql/sql_repl.cc18
-rw-r--r--sql/unireg.h4
14 files changed, 762 insertions, 662 deletions
diff --git a/include/my_sys.h b/include/my_sys.h
index ebc86c27d1f..7ad638bcca8 100644
--- a/include/my_sys.h
+++ b/include/my_sys.h
@@ -302,33 +302,41 @@ typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
typedef struct st_io_cache /* Used when cacheing files */
{
- /* pos_in_file is offset in file corresponding to the first byte of
- byte* buffer. end_of_file is the offset of end of file for READ_CACHE
- and WRITE_CACHE. For SEQ_READ_APPEND it the maximum of the actual
- end of file and the position represented by read_end.
+ /* Offset in file corresponding to the first byte of byte* buffer. */
+ my_off_t pos_in_file;
+ /*
+ The offset of end of file for READ_CACHE and WRITE_CACHE.
+ For SEQ_READ_APPEND it the maximum of the actual end of file and
+ the position represented by read_end.
*/
- my_off_t pos_in_file,end_of_file;
- /* read_pos points to current read position in the buffer
- read_end is the non-inclusive boundary in the buffer for the currently
- valid read area
- buffer is the read buffer
- not sure about request_pos except that it is used in async_io
+ my_off_t end_of_file;
+ /* Points to current read position in the buffer */
+ byte *read_pos;
+ /* the non-inclusive boundary in the buffer for the currently valid read */
+ byte *read_end;
+ byte *buffer; /* The read buffer */
+ /* Used in ASYNC_IO */
+ byte *request_pos;
+
+ /* Only used in WRITE caches and in SEQ_READ_APPEND to buffer writes */
+ byte *write_buffer;
+ /*
+ Only used in SEQ_READ_APPEND, and points to the current read position
+ in the write buffer. Note that reads in SEQ_READ_APPEND caches can
+ happen from both read buffer (byte* buffer) and write buffer
+ (byte* write_buffer).
*/
- byte *read_pos,*read_end,*buffer,*request_pos;
- /* write_buffer is used only in WRITE caches and in SEQ_READ_APPEND to
- buffer writes
- append_read_pos is only used in SEQ_READ_APPEND, and points to the
- current read position in the write buffer. Note that reads in
- SEQ_READ_APPEND caches can happen from both read buffer (byte* buffer),
- and write buffer (byte* write_buffer).
- write_pos points to current write position in the write buffer and
- write_end is the non-inclusive boundary of the valid write area
- */
- byte *write_buffer, *append_read_pos, *write_pos, *write_end;
- /* current_pos and current_end are convenience variables used by
- my_b_tell() and other routines that need to know the current offset
- current_pos points to &write_pos, and current_end to &write_end in a
- WRITE_CACHE, and &read_pos and &read_end respectively otherwise
+ byte *append_read_pos;
+ /* Points to current write position in the write buffer */
+ byte *write_pos;
+ /* The non-inclusive boundary of the valid write area */
+ byte *write_end;
+
+ /*
+ Current_pos and current_end are convenience variables used by
+ my_b_tell() and other routines that need to know the current offset
+ current_pos points to &write_pos, and current_end to &write_end in a
+ WRITE_CACHE, and &read_pos and &read_end respectively otherwise
*/
byte **current_pos, **current_end;
/* The lock is for append buffer used in SEQ_READ_APPEND cache */
@@ -336,70 +344,64 @@ typedef struct st_io_cache /* Used when cacheing files */
pthread_mutex_t append_buffer_lock;
/* need mutex copying from append buffer to read buffer */
#endif
- /* a caller will use my_b_read() macro to read from the cache
- if the data is already in cache, it will be simply copied with
- memcpy() and internal variables will be accordinging updated with
- no functions invoked. However, if the data is not fully in the cache,
- my_b_read() will call read_function to fetch the data. read_function
- must never be invoked directly
+ /*
+ A caller will use my_b_read() macro to read from the cache
+ if the data is already in cache, it will be simply copied with
+ memcpy() and internal variables will be accordinging updated with
+ no functions invoked. However, if the data is not fully in the cache,
+ my_b_read() will call read_function to fetch the data. read_function
+ must never be invoked directly.
*/
int (*read_function)(struct st_io_cache *,byte *,uint);
- /* same idea as in the case of read_function, except my_b_write() needs to
- be replaced with my_b_append() for a SEQ_READ_APPEND cache
+ /*
+ Same idea as in the case of read_function, except my_b_write() needs to
+ be replaced with my_b_append() for a SEQ_READ_APPEND cache
*/
int (*write_function)(struct st_io_cache *,const byte *,uint);
- /* specifies the type of the cache. Depending on the type of the cache
+ /*
+ Specifies the type of the cache. Depending on the type of the cache
certain operations might not be available and yield unpredicatable
results. Details to be documented later
*/
enum cache_type type;
- /* callbacks when the actual read I/O happens. These were added and
- are currently used for binary logging of LOAD DATA INFILE - when a
- block is read from the file, we create a block create/append event, and
- when IO_CACHE is closed, we create an end event. These functions could,
- of course be used for other things
+ /*
+ Callbacks when the actual read I/O happens. These were added and
+ are currently used for binary logging of LOAD DATA INFILE - when a
+ block is read from the file, we create a block create/append event, and
+ when IO_CACHE is closed, we create an end event. These functions could,
+ of course be used for other things
*/
IO_CACHE_CALLBACK pre_read;
IO_CACHE_CALLBACK post_read;
IO_CACHE_CALLBACK pre_close;
- void* arg; /* for use by pre/post_read */
+ void* arg; /* for use by pre/post_read */
char *file_name; /* if used with 'open_cached_file' */
char *dir,*prefix;
File file; /* file descriptor */
- /* seek_not_done is set by my_b_seek() to inform the upcoming read/write
- operation that a seek needs to be preformed prior to the actual I/O
- error is 0 if the cache operation was successful, -1 if there was a
- "hard" error, and the actual number of I/O-ed bytes if the read/write was
- partial
+ /*
+ seek_not_done is set by my_b_seek() to inform the upcoming read/write
+ operation that a seek needs to be preformed prior to the actual I/O
+ error is 0 if the cache operation was successful, -1 if there was a
+ "hard" error, and the actual number of I/O-ed bytes if the read/write was
+ partial.
*/
int seek_not_done,error;
- /* buffer_length is the size of memory allocated for buffer or write_buffer
- read_length is the same as buffer_length except when we use async io
- not sure why we need it
- */
- uint buffer_length,read_length;
+ /* buffer_length is memory size allocated for buffer or write_buffer */
+ uint buffer_length;
+ /* read_length is the same as buffer_length except when we use async io */
+ uint read_length;
myf myflags; /* Flags used to my_read/my_write */
- /*
+ /*
alloced_buffer is 1 if the buffer was allocated by init_io_cache() and
- 0 if it was supplied by the user
+ 0 if it was supplied by the user.
Currently READ_NET is the only one that will use a buffer allocated
somewhere else
*/
my_bool alloced_buffer;
- /* init_count is incremented every time we call init_io_cache()
- It is not reset in end_io_cache(). This variable
- was introduced for slave relay logs - RELAY_LOG_INFO stores a pointer
- to IO_CACHE that could in some cases refer to the IO_CACHE of the
- currently active relay log. The IO_CACHE then could be closed,
- re-opened and start pointing to a different log file. In that case,
- we could not know reliably if this happened without init_count
- one must be careful with bzero() prior to the subsequent init_io_cache()
- call
- */
- int init_count;
#ifdef HAVE_AIOWAIT
- /* as inidicated by ifdef, this is for async I/O, we will have
- Sinisa comment this some time
+ /*
+ As inidicated by ifdef, this is for async I/O, which is not currently
+ used (because it's not reliable on all systems)
*/
uint inited;
my_off_t aio_read_pos;
@@ -428,7 +430,6 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
((info)->write_pos+=(Count)),0) : \
(*(info)->write_function)((info),(Buffer),(Count)))
-
#define my_b_get(info) \
((info)->read_pos != (info)->read_end ?\
((info)->read_pos++, (int) (uchar) (info)->read_pos[-1]) :\
diff --git a/innobase/srv/srv0srv.c b/innobase/srv/srv0srv.c
index f366ce0d160..39f3566eac8 100644
--- a/innobase/srv/srv0srv.c
+++ b/innobase/srv/srv0srv.c
@@ -1631,6 +1631,7 @@ srv_init(void)
for (i = 0; i < OS_THREAD_MAX_N; i++) {
slot = srv_table_get_nth_slot(i);
slot->in_use = FALSE;
+ slot->type=0; /* Avoid purify errors */
slot->event = os_event_create(NULL);
ut_a(slot->event);
}
@@ -1899,7 +1900,6 @@ srv_conc_exit_innodb(
trx_t* trx) /* in: transaction object associated with the
thread */
{
- srv_conc_slot_t* slot = NULL;
if (srv_thread_concurrency >= 500) {
@@ -2514,11 +2514,11 @@ loop:
can drop tables lazily after there no longer are SELECT
queries to them. */
- srv_main_thread_op_info = "doing background drop tables";
+ srv_main_thread_op_info = (char*) "doing background drop tables";
row_drop_tables_for_mysql_in_background();
- srv_main_thread_op_info = "";
+ srv_main_thread_op_info = (char*) "";
if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND) {
@@ -2630,19 +2630,19 @@ background_loop:
/* In this loop we run background operations when the server
is quiet and we also come here about once in 10 seconds */
- srv_main_thread_op_info = "doing background drop tables";
+ srv_main_thread_op_info = (char*) "doing background drop tables";
n_tables_to_drop = row_drop_tables_for_mysql_in_background();
- srv_main_thread_op_info = "";
+ srv_main_thread_op_info = (char*) "";
- srv_main_thread_op_info = "flushing buffer pool pages";
+ srv_main_thread_op_info = (char*) "flushing buffer pool pages";
/* Flush a few oldest pages to make the checkpoint younger */
n_pages_flushed = buf_flush_batch(BUF_FLUSH_LIST, 10, ut_dulint_max);
- srv_main_thread_op_info = "making checkpoint";
+ srv_main_thread_op_info = (char*) "making checkpoint";
/* Make a new checkpoint about once in 10 seconds */
diff --git a/mysql-test/mysql-test-run.sh b/mysql-test/mysql-test-run.sh
index 7ae89c96169..66bc7f5069b 100644
--- a/mysql-test/mysql-test-run.sh
+++ b/mysql-test/mysql-test-run.sh
@@ -675,9 +675,9 @@ manager_launch()
ident=$1
shift
if [ $USE_MANAGER = 0 ] ; then
- $@ >$CUR_MYERR 2>&1 &
- sleep 2 #hack
- return
+ $@ >$CUR_MYERR 2>&1 &
+ sleep 2 #hack
+ return
fi
$MYSQL_MANAGER_CLIENT $MANAGER_QUIET_OPT --user=$MYSQL_MANAGER_USER \
--password=$MYSQL_MANAGER_PW --port=$MYSQL_MANAGER_PORT <<EOF
@@ -687,7 +687,7 @@ set_exec_stderr $ident $CUR_MYERR
set_exec_con $ident root localhost $CUR_MYSOCK
start_exec $ident $START_WAIT_TIMEOUT
EOF
- abort_if_failed "Could not execute manager command"
+ abort_if_failed "Could not execute manager command"
}
manager_term()
@@ -887,13 +887,23 @@ start_slave()
"gdb -x $GDB_SLAVE_INIT" $SLAVE_MYSQLD
elif [ x$DO_GDB = x1 ]
then
- $ECHO "set args $slave_args" > $GDB_SLAVE_INIT
if [ x$MANUAL_GDB = x1 ]
then
+ $ECHO "set args $slave_args" > $GDB_SLAVE_INIT
echo "To start gdb for the slave, type in another window:"
echo "cd $CWD ; gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD"
wait_for_slave=1500
else
+ ( $ECHO set args $slave_args;
+ if [ $USE_MANAGER = 0 ] ; then
+ cat <<EOF
+b mysql_parse
+commands 1
+disa 1
+end
+r
+EOF
+ fi ) > $GDB_SLAVE_INIT
manager_launch $slave_ident $XTERM -display $DISPLAY -title "Slave" -e \
gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD
fi
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index a0247003a81..34873d107af 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -122,8 +122,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->pos_in_file= seek_offset;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
- info->init_count++; /* we assume the user had set it to 0 prior to
- first call */
info->alloced_buffer = 0;
info->buffer=0;
info->seek_not_done= test(file >= 0);
diff --git a/sql/field.cc b/sql/field.cc
index c6880a29cee..5466facb437 100644
--- a/sql/field.cc
+++ b/sql/field.cc
@@ -4628,7 +4628,7 @@ bool Field_num::eq_def(Field *field)
*****************************************************************************/
/*
-** Make a field from the .frm file info
+ Make a field from the .frm file info
*/
uint32 calc_pack_length(enum_field_types type,uint32 length)
@@ -4657,6 +4657,7 @@ uint32 calc_pack_length(enum_field_types type,uint32 length)
case FIELD_TYPE_LONG_BLOB: return 4+portable_sizeof_char_ptr;
case FIELD_TYPE_SET:
case FIELD_TYPE_ENUM: abort(); return 0; // This shouldn't happen
+ default: return 0;
}
return 0; // This shouldn't happen
}
diff --git a/sql/log.cc b/sql/log.cc
index f0012a94f5d..0f8c4a8c4a8 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -80,10 +80,10 @@ static int find_uniq_filename(char *name)
DBUG_RETURN(0);
}
-MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1),
- name(0), log_type(LOG_CLOSED),write_error(0),
- inited(0), file_id(1),no_rotate(0),
- need_start_event(1),bytes_written(0)
+MYSQL_LOG::MYSQL_LOG()
+ :bytes_written(0), last_time(0), query_start(0), index_file(-1), name(0),
+ file_id(1), open_count(1), log_type(LOG_CLOSED), write_error(0), inited(0),
+ no_rotate(0), need_start_event(1)
{
/*
We don't want to intialize LOCK_Log here as the thread system may
@@ -173,8 +173,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
File file= -1;
bool do_magic;
int open_flags = O_CREAT | O_APPEND | O_BINARY;
+ DBUG_ENTER("MYSQL_LOG::open");
+
if (!inited && log_type_arg == LOG_BIN && *fn_ext(log_name))
- no_rotate = 1;
+ no_rotate = 1;
init(log_type_arg,io_cache_type_arg,no_auto_events_arg);
if (!(name=my_strdup(log_name,MYF(MY_WME))))
@@ -196,6 +198,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name,
&tmp_stat, MYF(0)));
+ open_count++;
if ((file=my_open(log_file_name,open_flags,
MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
@@ -237,10 +240,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
bool error;
if (do_magic)
{
- if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4) ||
- open_index(O_APPEND | O_RDWR | O_CREAT))
+ if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, BIN_LOG_HEADER_SIZE) ||
+ open_index(O_APPEND | O_RDWR | O_CREAT))
goto err;
- bytes_written += 4;
+ bytes_written += BIN_LOG_HEADER_SIZE;
}
if (need_start_event && !no_auto_events)
@@ -262,7 +265,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
goto err;
}
}
- return;
+ DBUG_VOID_RETURN;
err:
sql_print_error("Could not use %s for logging (error %d)", log_name,errno);
@@ -271,7 +274,7 @@ err:
end_io_cache(&log_file);
x_free(name); name=0;
log_type=LOG_CLOSED;
- return;
+ DBUG_VOID_RETURN;
}
int MYSQL_LOG::get_current_log(LOG_INFO* linfo)
@@ -284,6 +287,7 @@ int MYSQL_LOG::get_current_log(LOG_INFO* linfo)
}
// if log_name is "" we stop at the first entry
+
int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name,
bool need_mutex)
{
@@ -294,8 +298,10 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name,
uint log_name_len = (uint) strlen(log_name);
IO_CACHE io_cache;
- // mutex needed because we need to make sure the file pointer does not move
- // from under our feet
+ /*
+ Mutex needed because we need to make sure the file pointer does not move
+ from under our feet
+ */
if (need_mutex)
pthread_mutex_lock(&LOCK_index);
if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) 0,
@@ -304,7 +310,7 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name,
error = LOG_INFO_SEEK;
goto err;
}
- for(;;)
+ for (;;)
{
uint length;
if (!(length=my_b_gets(&io_cache, fname, FN_REFLEN-1)))
@@ -336,9 +342,12 @@ err:
int MYSQL_LOG::find_next_log(LOG_INFO* linfo, bool need_lock)
{
- // mutex needed because we need to make sure the file pointer does not move
- // from under our feet
- if (index_file < 0) return LOG_INFO_INVALID;
+ /*
+ Mutex needed because we need to make sure the file pointer does not move
+ from under our feet
+ */
+ if (index_file < 0)
+ return LOG_INFO_INVALID;
int error = 0;
char* fname = linfo->log_file_name;
IO_CACHE io_cache;
@@ -382,7 +391,7 @@ int MYSQL_LOG::reset_logs(THD* thd)
goto err;
}
- for(;;)
+ for (;;)
{
my_delete(linfo.log_file_name, MYF(MY_WME));
if (find_next_log(&linfo))
@@ -490,7 +499,7 @@ err:
rli->linfo.log_file_name);
goto err2;
}
- rli->relay_log_pos = 4;
+ rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
strnmov(rli->relay_log_name,rli->linfo.log_file_name,
sizeof(rli->relay_log_name));
flush_relay_log_info(rli);
@@ -550,7 +559,7 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
my_off_t init_purge_offset= my_b_tell(&io_cache);
if (!(fname_len=my_b_gets(&io_cache, fname, FN_REFLEN)))
{
- if(!io_cache.error)
+ if (!io_cache.error)
break;
error = LOG_INFO_IO;
goto err;
@@ -993,8 +1002,13 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache)
if (is_open())
{
+ /*
+ We come here when the queries to be logged could not fit into memory
+ and part of the queries are stored in a log file on disk.
+ */
+
uint length;
- //QQ: this looks like a bug - why READ_CACHE?
+ /* Read from the file used to cache the queries .*/
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
{
sql_print_error(ER(ER_ERROR_ON_WRITE), cache->file_name, errno);
@@ -1003,6 +1017,7 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache)
length=my_b_bytes_in_cache(cache);
do
{
+ /* Write data to the binary log file */
if (my_b_write(&log_file, cache->read_pos, length))
{
if (!write_error)
@@ -1168,19 +1183,23 @@ void MYSQL_LOG:: wait_for_update(THD* thd)
const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log,
"Slave: waiting for binlog update");
pthread_cond_wait(&update_cond, &LOCK_log);
- // this is not a bug - we unlock the mutex for the caller, and expect him
- // to lock it and then not unlock it upon return. This is a rather odd
- // way of doing things, but this is the cleanest way I could think of to
- // solve the race deadlock caused by THD::awake() first acquiring mysys_var
- // mutex and then the current mutex, while wait_for_update being called with
- // the current mutex already aquired and THD::exit_cond() trying to acquire
- // mysys_var mutex. We do need the mutex to be acquired prior to the
- // invocation of wait_for_update in all cases, so mutex acquisition inside
- // wait_for_update() is not an option
+ /*
+ This is not a bug:
+ We unlock the mutex for the caller, and expect him to lock it and
+ then not unlock it upon return. This is a rather odd way of doing
+ things, but this is the cleanest way I could think of to solve the
+ race deadlock caused by THD::awake() first acquiring mysys_var
+ mutex and then the current mutex, while wait_for_update being
+ called with the current mutex already aquired and THD::exit_cond()
+ trying to acquire mysys_var mutex. We do need the mutex to be
+ acquired prior to the invocation of wait_for_update in all cases,
+ so mutex acquisition inside wait_for_update() is not an option.
+ */
pthread_mutex_unlock(&LOCK_log);
thd->exit_cond(old_msg);
}
+
void MYSQL_LOG::close(bool exiting)
{ // One can't set log_type here!
if (is_open())
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fd04f8dbbaa..9315baa0de5 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -159,11 +159,11 @@ static void cleanup_load_tmpdir()
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
return;
- for (i=0;i<(uint)dirp->number_off_files;i++)
+ for (i=0 ; i < (uint)dirp->number_off_files; i++)
{
file=dirp->dir_entry+i;
- if (!memcmp(file->name,"SQL_LOAD-",9))
- my_delete(file->name,MYF(MY_WME));
+ if (is_prefix(file->name,"SQL_LOAD-"))
+ my_delete(file->name,MYF(0));
}
my_dirend(dirp);
@@ -246,7 +246,7 @@ void Load_log_event::pack_info(String* packet)
char buf[256];
String tmp(buf, sizeof(buf));
tmp.length(0);
- if(db && db_len)
+ if (db && db_len)
{
tmp.append("use ");
tmp.append(db, db_len);
@@ -256,9 +256,9 @@ void Load_log_event::pack_info(String* packet)
tmp.append("LOAD DATA INFILE '");
tmp.append(fname, fname_len);
tmp.append("' ", 2);
- if(sql_ex.opt_flags && REPLACE_FLAG )
+ if (sql_ex.opt_flags && REPLACE_FLAG )
tmp.append(" REPLACE ");
- else if(sql_ex.opt_flags && IGNORE_FLAG )
+ else if (sql_ex.opt_flags && IGNORE_FLAG )
tmp.append(" IGNORE ");
tmp.append("INTO TABLE ");
@@ -305,7 +305,7 @@ void Load_log_event::pack_info(String* packet)
tmp.append(" (");
for(i = 0; i < num_fields; i++)
{
- if(i)
+ if (i)
tmp.append(" ,");
tmp.append( field);
@@ -326,7 +326,7 @@ void Rotate_log_event::pack_info(String* packet)
tmp.append(new_log_ident, ident_len);
tmp.append(";pos=");
tmp.append(llstr(pos,buf));
- if(flags & LOG_EVENT_FORCED_ROTATE_F)
+ if (flags & LOG_EVENT_FORCED_ROTATE_F)
tmp.append("; forced by master");
net_store_data(packet, tmp.ptr(), tmp.length());
}
@@ -436,7 +436,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
// if the read hits eof, we must report it as eof
// so the caller will know it can go into cond_wait to be woken up
// on the next update to the log
- if(!file->error) return LOG_READ_EOF;
+ if (!file->error) return LOG_READ_EOF;
return file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO;
}
data_len = uint4korr(buf + EVENT_LEN_OFFSET);
@@ -452,7 +452,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
{
if (packet->append(file, data_len))
{
- if(log_lock)
+ if (log_lock)
pthread_mutex_unlock(log_lock);
// here we should never hit eof in a non-error condtion
// eof means we are reading the event partially, which should
@@ -467,13 +467,13 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#endif // MYSQL_CLIENT
#ifndef MYSQL_CLIENT
-#define UNLOCK_MUTEX if(log_lock) pthread_mutex_unlock(log_lock);
+#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
#else
#define UNLOCK_MUTEX
#endif
#ifndef MYSQL_CLIENT
-#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock);
+#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
#else
#define LOCK_MUTEX
#endif
@@ -672,7 +672,7 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
if (new_log_ident)
my_fwrite(file, (byte*) new_log_ident, (uint)ident_len,
MYF(MY_NABP | MY_WME));
- fprintf(file, "pos=%s\n", llstr(pos, buf));
+ fprintf(file, " pos: %s\n", llstr(pos, buf));
fflush(file);
}
@@ -701,11 +701,10 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
bool old_format):
Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
{
- // the caller will ensure that event_len is what we have at
- // EVENT_LEN_OFFSET
+ // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
uint ident_offset;
- if(event_len < header_size)
+ if (event_len < header_size)
return;
buf += header_size;
if (old_format)
@@ -753,8 +752,8 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
#endif
Query_log_event::Query_log_event(const char* buf, int event_len,
- bool old_format):
- Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
+ bool old_format)
+ :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
{
ulong data_len;
if (old_format)
@@ -801,9 +800,9 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
bool same_db = 0;
- if(db && last_db)
+ if (db && last_db)
{
- if(!(same_db = !memcmp(last_db, db, db_len + 1)))
+ if (!(same_db = !memcmp(last_db, db, db_len + 1)))
memcpy(last_db, db, db_len + 1);
}
@@ -864,7 +863,7 @@ int Intvar_log_event::write_data(IO_CACHE* file)
void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
{
char llbuff[22];
- if(!short_form)
+ if (!short_form)
{
print_header(file);
fprintf(file, "\tIntvar\n");
@@ -961,11 +960,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
if (use_new_format)
{
empty_flags=0;
- /* the code below assumes that buf will not disappear from
- under our feet during the lifetime of the event. This assumption
- holds true in the slave thread if the log is in new format, but is not
- the case when we have old format because we will be reusing net buffer
- to read the actual file before we write out the Create_file event
+ /*
+ The code below assumes that buf will not disappear from
+ under our feet during the lifetime of the event. This assumption
+ holds true in the slave thread if the log is in new format, but is not
+ the case when we have old format because we will be reusing net buffer
+ to read the actual file before we write out the Create_file event.
*/
if (read_str(buf, buf_end, field_term, field_term_len) ||
read_str(buf, buf_end, enclosed, enclosed_len) ||
@@ -1003,77 +1003,75 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
#ifndef MYSQL_CLIENT
Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
const char* db_arg, const char* table_name_arg,
- List<Item>& fields_arg, enum enum_duplicates handle_dup):
- Log_event(thd),thread_id(thd->thread_id),
- num_fields(0),fields(0),field_lens(0),field_block_len(0),
- table_name(table_name_arg),
- db(db_arg),
- fname(ex->file_name)
- {
- time_t end_time;
- time(&end_time);
- exec_time = (ulong) (end_time - thd->start_time);
- db_len = (db) ? (uint32) strlen(db) : 0;
- table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
- fname_len = (fname) ? (uint) strlen(fname) : 0;
- sql_ex.field_term = (char*) ex->field_term->ptr();
- sql_ex.field_term_len = (uint8) ex->field_term->length();
- sql_ex.enclosed = (char*) ex->enclosed->ptr();
- sql_ex.enclosed_len = (uint8) ex->enclosed->length();
- sql_ex.line_term = (char*) ex->line_term->ptr();
- sql_ex.line_term_len = (uint8) ex->line_term->length();
- sql_ex.line_start = (char*) ex->line_start->ptr();
- sql_ex.line_start_len = (uint8) ex->line_start->length();
- sql_ex.escaped = (char*) ex->escaped->ptr();
- sql_ex.escaped_len = (uint8) ex->escaped->length();
- sql_ex.opt_flags = 0;
- sql_ex.cached_new_format = -1;
+ List<Item>& fields_arg, enum enum_duplicates handle_dup)
+ :Log_event(thd),thread_id(thd->thread_id), num_fields(0),fields(0),
+ field_lens(0),field_block_len(0), table_name(table_name_arg),
+ db(db_arg), fname(ex->file_name)
+{
+ time_t end_time;
+ time(&end_time);
+ exec_time = (ulong) (end_time - thd->start_time);
+ db_len = (db) ? (uint32) strlen(db) : 0;
+ table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
+ fname_len = (fname) ? (uint) strlen(fname) : 0;
+ sql_ex.field_term = (char*) ex->field_term->ptr();
+ sql_ex.field_term_len = (uint8) ex->field_term->length();
+ sql_ex.enclosed = (char*) ex->enclosed->ptr();
+ sql_ex.enclosed_len = (uint8) ex->enclosed->length();
+ sql_ex.line_term = (char*) ex->line_term->ptr();
+ sql_ex.line_term_len = (uint8) ex->line_term->length();
+ sql_ex.line_start = (char*) ex->line_start->ptr();
+ sql_ex.line_start_len = (uint8) ex->line_start->length();
+ sql_ex.escaped = (char*) ex->escaped->ptr();
+ sql_ex.escaped_len = (uint8) ex->escaped->length();
+ sql_ex.opt_flags = 0;
+ sql_ex.cached_new_format = -1;
- if(ex->dumpfile)
- sql_ex.opt_flags |= DUMPFILE_FLAG;
- if(ex->opt_enclosed)
- sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
+ if (ex->dumpfile)
+ sql_ex.opt_flags |= DUMPFILE_FLAG;
+ if (ex->opt_enclosed)
+ sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
- sql_ex.empty_flags = 0;
+ sql_ex.empty_flags = 0;
- switch(handle_dup)
- {
- case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
- case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
- case DUP_ERROR: break;
- }
+ switch(handle_dup)
+ {
+ case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
+ case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
+ case DUP_ERROR: break;
+ }
- if(!ex->field_term->length())
- sql_ex.empty_flags |= FIELD_TERM_EMPTY;
- if(!ex->enclosed->length())
- sql_ex.empty_flags |= ENCLOSED_EMPTY;
- if(!ex->line_term->length())
- sql_ex.empty_flags |= LINE_TERM_EMPTY;
- if(!ex->line_start->length())
- sql_ex.empty_flags |= LINE_START_EMPTY;
- if(!ex->escaped->length())
- sql_ex.empty_flags |= ESCAPED_EMPTY;
+ if (!ex->field_term->length())
+ sql_ex.empty_flags |= FIELD_TERM_EMPTY;
+ if (!ex->enclosed->length())
+ sql_ex.empty_flags |= ENCLOSED_EMPTY;
+ if (!ex->line_term->length())
+ sql_ex.empty_flags |= LINE_TERM_EMPTY;
+ if (!ex->line_start->length())
+ sql_ex.empty_flags |= LINE_START_EMPTY;
+ if (!ex->escaped->length())
+ sql_ex.empty_flags |= ESCAPED_EMPTY;
- skip_lines = ex->skip_lines;
-
- List_iterator<Item> li(fields_arg);
- field_lens_buf.length(0);
- fields_buf.length(0);
- Item* item;
- while((item = li++))
- {
- num_fields++;
- uchar len = (uchar) strlen(item->name);
- field_block_len += len + 1;
- fields_buf.append(item->name, len + 1);
- field_lens_buf.append((char*)&len, 1);
- }
+ skip_lines = ex->skip_lines;
- field_lens = (const uchar*)field_lens_buf.ptr();
- fields = fields_buf.ptr();
+ List_iterator<Item> li(fields_arg);
+ field_lens_buf.length(0);
+ fields_buf.length(0);
+ Item* item;
+ while ((item = li++))
+ {
+ num_fields++;
+ uchar len = (uchar) strlen(item->name);
+ field_block_len += len + 1;
+ fields_buf.append(item->name, len + 1);
+ field_lens_buf.append((char*)&len, 1);
}
+ field_lens = (const uchar*)field_lens_buf.ptr();
+ fields = fields_buf.ptr();
+}
+
#endif
// the caller must do buf[event_len] = 0 before he starts using the
@@ -1145,32 +1143,32 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
bool same_db = 0;
- if(db && last_db)
- {
- if(!(same_db = !memcmp(last_db, db, db_len + 1)))
- memcpy(last_db, db, db_len + 1);
- }
+ if (db && last_db)
+ {
+ if (!(same_db = !memcmp(last_db, db, db_len + 1)))
+ memcpy(last_db, db, db_len + 1);
+ }
- if(db && db[0] && !same_db)
+ if (db && db[0] && !same_db)
fprintf(file, "use %s;\n", db);
fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname);
- if(sql_ex.opt_flags && REPLACE_FLAG )
+ if (sql_ex.opt_flags && REPLACE_FLAG )
fprintf(file," REPLACE ");
- else if(sql_ex.opt_flags && IGNORE_FLAG )
+ else if (sql_ex.opt_flags && IGNORE_FLAG )
fprintf(file," IGNORE ");
fprintf(file, "INTO TABLE %s ", table_name);
- if(sql_ex.field_term)
+ if (sql_ex.field_term)
{
fprintf(file, " FIELDS TERMINATED BY ");
pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len);
}
- if(sql_ex.enclosed)
+ if (sql_ex.enclosed)
{
- if(sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
+ if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
fprintf(file," OPTIONALLY ");
fprintf(file, " ENCLOSED BY ");
pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len);
@@ -1194,7 +1192,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len);
}
- if((int)skip_lines > 0)
+ if ((int)skip_lines > 0)
fprintf(file, " IGNORE %ld LINES ", (long) skip_lines);
if (num_fields)
@@ -1204,7 +1202,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
fprintf( file, " (");
for(i = 0; i < num_fields; i++)
{
- if(i)
+ if (i)
fputc(',', file);
fprintf(file, field);
@@ -1282,7 +1280,7 @@ Slave_log_event::~Slave_log_event()
void Slave_log_event::print(FILE* file, bool short_form, char* last_db)
{
char llbuff[22];
- if(short_form)
+ if (short_form)
return;
print_header(file);
fputc('\n', file);
@@ -1314,7 +1312,7 @@ void Slave_log_event::init_from_mem_pool(int data_size)
master_host_len = strlen(master_host);
// safety
master_log = master_host + master_host_len + 1;
- if(master_log > mem_pool + data_size)
+ if (master_log > mem_pool + data_size)
{
master_host = 0;
return;
@@ -1326,9 +1324,9 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len):
Log_event(buf,0),mem_pool(0),master_host(0)
{
event_len -= LOG_EVENT_HEADER_LEN;
- if(event_len < 0)
+ if (event_len < 0)
return;
- if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME))))
+ if (!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME))))
return;
memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
mem_pool[event_len] = 0;
@@ -1341,7 +1339,7 @@ Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex,
List<Item>& fields_arg, enum enum_duplicates handle_dup,
char* block_arg, uint block_len_arg):
Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup),
- fake_base(0),block(block_arg),block_len(block_len_arg),
+ fake_base(0),block(block_arg),block_len(block_len_arg),
file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
{
sql_ex.force_new_format();
@@ -1409,7 +1407,7 @@ void Create_file_log_event::print(FILE* file, bool short_form,
if (short_form)
return;
Load_log_event::print(file, 1, last_db);
- fprintf(file, " file_id=%d, block_len=%d\n", file_id, block_len);
+ fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len);
}
#endif
@@ -1444,7 +1442,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
Append_block_log_event::Append_block_log_event(const char* buf, int len):
Log_event(buf, 0),block(0)
{
- if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
+ if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD;
@@ -1467,7 +1465,7 @@ void Append_block_log_event::print(FILE* file, bool short_form,
return;
print_header(file);
fputc('\n', file);
- fprintf(file, "#Append_block: file_id=%d, block_len=%d\n",
+ fprintf(file, "#Append_block: file_id: %d block_len: %d\n",
file_id, block_len);
}
#endif
@@ -1496,7 +1494,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
Delete_file_log_event::Delete_file_log_event(const char* buf, int len):
Log_event(buf, 0),file_id(0)
{
- if((uint)len < DELETE_FILE_EVENT_OVERHEAD)
+ if ((uint)len < DELETE_FILE_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
}
@@ -1543,7 +1541,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
Execute_load_log_event::Execute_load_log_event(const char* buf,int len):
Log_event(buf, 0),file_id(0)
{
- if((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
+ if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET);
}
@@ -1662,7 +1660,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
thd->query = 0;
thd->query_error = 0;
- if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
+ if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->set_time((time_t)when);
thd->current_tablenr = 0;
@@ -1676,7 +1674,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
tables.name = tables.real_name = (char*)table_name;
tables.lock_type = TL_WRITE;
// the table will be opened in mysql_load
- if(table_rules_on && !tables_ok(thd, &tables))
+ if (table_rules_on && !tables_ok(thd, &tables))
{
// TODO: this is a bug - this needs to be moved to the I/O thread
if (net)
@@ -1712,14 +1710,14 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
// about the packet sequence
thd->net.pkt_nr = net->pkt_nr;
}
- if(mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0,
+ if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0,
TL_WRITE))
thd->query_error = 1;
- if(thd->cuted_fields)
+ if (thd->cuted_fields)
sql_print_error("Slave: load data infile at position %s in log \
'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME,
thd->cuted_fields );
- if(net)
+ if (net)
net->pkt_nr = thd->net.pkt_nr;
}
}
@@ -1735,7 +1733,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
thd->net.vio = 0;
thd->db = 0;// prevent db from being freed
close_thread_tables(thd);
- if(thd->query_error)
+ if (thd->query_error)
{
int sql_error = thd->net.last_errno;
if (!sql_error)
@@ -1749,7 +1747,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
}
free_root(&thd->mem_root,0);
- if(thd->fatal_error)
+ if (thd->fatal_error)
{
sql_print_error("Slave: Fatal error running LOAD DATA INFILE ");
return 1;
@@ -1849,7 +1847,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
int Slave_log_event::exec_event(struct st_relay_log_info* rli)
{
- if(mysql_bin_log.is_open())
+ if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
return Log_event::exec_event(rli);
}
@@ -1978,11 +1976,12 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
slave_print_error(rli,0, "File '%s' appears corrupted", fname);
goto err;
}
- // we want to disable binary logging in slave thread
- // because we need the file events to appear in the same order
- // as they do on the master relative to other events, so that we
- // can preserve ascending order of log sequence numbers - needed
- // to handle failover
+ /*
+ We want to disable binary logging in slave thread because we need the file
+ events to appear in the same order as they do on the master relative to
+ other events, so that we can preserve ascending order of log sequence
+ numbers - needed to handle failover .
+ */
save_options = thd->options;
thd->options &= ~ (ulong) (OPTION_BIN_LOG);
lev->thd = thd;
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index fab1491fc2b..c7ca906ca13 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -670,8 +670,10 @@ int load_master_data(THD* thd)
int restart_thread_mask;
mc_mysql_init(&mysql);
- // we do not want anyone messing with the slave at all for the entire
- // duration of the data load;
+ /*
+ We do not want anyone messing with the slave at all for the entire
+ duration of the data load.
+ */
LOCK_ACTIVE_MI;
lock_slave_threads(active_mi);
init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/);
@@ -707,8 +709,10 @@ int load_master_data(THD* thd)
if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
goto err;
- // in theory, the master could have no databases at all
- // and run with skip-grant
+ /*
+ In theory, the master could have no databases at all
+ and run with skip-grant
+ */
if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
{
@@ -716,10 +720,12 @@ int load_master_data(THD* thd)
goto err;
}
- // this is a temporary solution until we have online backup
- // capabilities - to be replaced once online backup is working
- // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
- // can to minimize the lock time
+ /*
+ This is a temporary solution until we have online backup
+ capabilities - to be replaced once online backup is working
+ we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
+ can to minimize the lock time.
+ */
if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
!(master_status_res = mc_mysql_store_result(&mysql)))
@@ -729,8 +735,10 @@ int load_master_data(THD* thd)
goto err;
}
- // go through every table in every database, and if the replication
- // rules allow replicating it, get it
+ /*
+ Go through every table in every database, and if the replication
+ rules allow replicating it, get it
+ */
table_res_end = table_res + num_dbs;
@@ -819,7 +827,7 @@ int load_master_data(THD* thd)
}
}
thd->proc_info="purging old relay logs";
- if (purge_relay_logs(&active_mi->rli,0 /* not only reset, but also reinit*/,
+ if (purge_relay_logs(&active_mi->rli,0 /* not only reset, but also reinit */,
&errmsg))
{
send_error(&thd->net, 0, "Failed purging old relay logs");
diff --git a/sql/slave.cc b/sql/slave.cc
index 93e711f2e14..66837436a09 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -42,12 +42,16 @@ bool do_table_inited = 0, ignore_table_inited = 0;
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
bool table_rules_on = 0;
static TABLE* save_temporary_tables = 0;
-ulong relay_log_space_limit = 0; /* TODO: fix variables to access ulonglong
- values and make it ulonglong */
-// when slave thread exits, we need to remember the temporary tables so we
-// can re-use them on slave start
+/* TODO: fix variables to access ulonglong values and make it ulonglong */
+ulong relay_log_space_limit = 0;
+
+/*
+ When slave thread exits, we need to remember the temporary tables so we
+ can re-use them on slave start.
+
+ TODO: move the vars below under MASTER_INFO
+*/
-// TODO: move the vars below under MASTER_INFO
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1;
int events_till_abort = -1;
@@ -83,9 +87,10 @@ void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
if (inverse)
{
- /* This makes me think of the Russian idiom "I am not I, and this is
- not my horse", which is used to deny reponsibility for
- one's actions.
+ /*
+ This makes me think of the Russian idiom "I am not I, and this is
+ not my horse", which is used to deny reponsibility for
+ one's actions.
*/
set_io = !set_io;
set_sql = !set_sql;
@@ -164,13 +169,16 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
}
// TODO: check proper initialization of master_log_name/master_log_pos
+
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
ulonglong pos, bool need_data_lock,
const char** errmsg)
{
+ DBUG_ENTER("init_relay_log_pos");
+
*errmsg=0;
if (rli->log_pos_current)
- return 0;
+ DBUG_RETURN(0);
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
pthread_mutex_lock(log_lock);
if (need_data_lock)
@@ -190,8 +198,10 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
else
rli->relay_log_pos = pos;
- // test to see if the previous run was with the skip of purging
- // if yes, we do not purge when we restart
+ /*
+ Test to see if the previous run was with the skip of purging
+ If yes, we do not purge when we restart
+ */
if (rli->relay_log.find_first_log(&rli->linfo,""))
{
*errmsg="Could not find first log during relay log initialization";
@@ -213,34 +223,31 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
if (rli->relay_log.is_active(rli->linfo.log_file_name))
{
if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
- check_binlog_magic(rli->cur_log,errmsg))
- {
+ check_binlog_magic(rli->cur_log,errmsg))
goto err;
- }
- rli->cur_log_init_count=rli->cur_log->init_count;
+ rli->cur_log_old_open_count=rli->relay_log.get_open_count();
}
else
{
if (rli->inited)
end_io_cache(&rli->cache_buf);
- if (rli->cur_log_fd>=0)
+ if (rli->cur_log_fd >= 0)
my_close(rli->cur_log_fd,MYF(MY_WME));
if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
rli->linfo.log_file_name,errmsg)) < 0)
- {
goto err;
- }
rli->cur_log = &rli->cache_buf;
}
- if (pos > 4)
- my_b_seek(rli->cur_log,(off_t)pos);
- rli->log_pos_current=1;
+ if (pos > BIN_LOG_HEADER_SIZE)
+ my_b_seek(rli->cur_log,(off_t)pos);
+ rli->log_pos_current=1;
+
err:
- pthread_cond_broadcast(&rli->data_cond);
- if (need_data_lock)
- pthread_mutex_unlock(&rli->data_lock);
- pthread_mutex_unlock(log_lock);
- return (*errmsg) ? 1 : 0;
+ pthread_cond_broadcast(&rli->data_cond);
+ if (need_data_lock)
+ pthread_mutex_unlock(&rli->data_lock);
+ pthread_mutex_unlock(log_lock);
+ DBUG_RETURN ((*errmsg) ? 1 : 0);
}
/* called from get_options() in mysqld.cc on start-up */
@@ -274,8 +281,9 @@ void init_slave_skip_errors(const char* arg)
}
}
+
/*
- We assume we have a run lock on rli and that the both slave thread
+ We assume we have a run lock on rli and that both slave thread
are not running
*/
@@ -284,9 +292,11 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
DBUG_ENTER("purge_relay_logs");
if (!rli->inited)
DBUG_RETURN(0); /* successfully do nothing */
+ int error=0;
+
DBUG_ASSERT(rli->slave_running == 0);
DBUG_ASSERT(rli->mi->slave_running == 0);
- int error=0;
+
rli->slave_skip_counter=0;
pthread_mutex_lock(&rli->data_lock);
rli->pending=0;
@@ -301,17 +311,19 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
}
strnmov(rli->relay_log_name,rli->linfo.log_file_name,
sizeof(rli->relay_log_name)-1);
- rli->log_space_total=4; //just first log with magic number and nothing else
- rli->relay_log_pos=4;
+ // Just first log with magic number and nothing else
+ rli->log_space_total= BIN_LOG_HEADER_SIZE;
+ rli->relay_log_pos= BIN_LOG_HEADER_SIZE;
rli->relay_log.reset_bytes_written();
rli->log_pos_current=0;
if (!just_reset)
- error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg);
+ error = init_relay_log_pos(rli,0,0,0 /* do not need data lock */,errmsg);
+
err:
#ifndef DBUG_OFF
char buf[22];
#endif
- DBUG_PRINT("info",("log_space_total=%s",llstr(rli->log_space_total,buf)));
+ DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf)));
pthread_mutex_unlock(&rli->data_lock);
DBUG_RETURN(error);
}
@@ -451,10 +463,12 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
"Waiting for slave thread to start");
pthread_cond_wait(start_cond,cond_lock);
thd->exit_cond(old_msg);
- // TODO: in a very rare case of init_slave_thread failing, it is
- // possible that we can get stuck here since slave_running will not
- // be set. We need to change slave_running to int and have -1 as
- // error code
+ /*
+ TODO: in a very rare case of init_slave_thread failing, it is
+ possible that we can get stuck here since slave_running will not
+ be set. We need to change slave_running to int and have -1 as
+ error code.
+ */
if (thd->killed)
{
pthread_mutex_unlock(cond_lock);
@@ -466,10 +480,14 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
pthread_mutex_unlock(start_lock);
return 0;
}
-/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
- sense to do that for starting a slave - we always care if it actually
- started the threads that were not previously running
+
+
+/*
+ SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
+ sense to do that for starting a slave - we always care if it actually
+ started the threads that were not previously running
*/
+
int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname, int thread_mask)
@@ -567,9 +585,10 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
return 0;
}
- // if no explicit rule found
- // and there was a do list, do not replicate. If there was
- // no do list, go ahead
+ /*
+ If no explicit rule found and there was a do list, do not replicate.
+ If there was no do list, go ahead
+ */
return !do_table_inited && !wild_do_table_inited;
}
@@ -577,12 +596,12 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
int add_table_rule(HASH* h, const char* table_spec)
{
const char* dot = strchr(table_spec, '.');
- if(!dot) return 1;
+ if (!dot) return 1;
// len is always > 0 because we know the there exists a '.'
uint len = (uint)strlen(table_spec);
TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
+ len, MYF(MY_WME));
- if(!e) return 1;
+ if (!e) return 1;
e->db = (char*)e + sizeof(TABLE_RULE_ENT);
e->tbl_name = e->db + (dot - table_spec) + 1;
e->key_len = len;
@@ -594,11 +613,11 @@ int add_table_rule(HASH* h, const char* table_spec)
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
const char* dot = strchr(table_spec, '.');
- if(!dot) return 1;
+ if (!dot) return 1;
uint len = (uint)strlen(table_spec);
TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
+ len, MYF(MY_WME));
- if(!e) return 1;
+ if (!e) return 1;
e->db = (char*)e + sizeof(TABLE_RULE_ENT);
e->tbl_name = e->db + (dot - table_spec) + 1;
e->key_len = len;
@@ -627,9 +646,11 @@ static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
void end_slave()
{
- // TODO: replace the line below with
- // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
- // once multi-master code is ready
+ /*
+ TODO: replace the line below with
+ list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
+ once multi-master code is ready.
+ */
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
end_master_info(active_mi);
if (do_table_inited)
@@ -671,65 +692,68 @@ void skip_load_data_infile(NET* net)
{
(void)my_net_write(net, "\xfb/dev/null", 10);
(void)net_flush(net);
- (void)my_net_read(net); // discard response
- send_ok(net); // the master expects it
+ (void)my_net_read(net); // discard response
+ send_ok(net); // the master expects it
}
char* rewrite_db(char* db)
{
- if(replicate_rewrite_db.is_empty() || !db) return db;
+ if (replicate_rewrite_db.is_empty() || !db)
+ return db;
I_List_iterator<i_string_pair> it(replicate_rewrite_db);
i_string_pair* tmp;
- while((tmp=it++))
- {
- if(!strcmp(tmp->key, db))
- return tmp->val;
- }
-
+ while ((tmp=it++))
+ {
+ if (!strcmp(tmp->key, db))
+ return tmp->val;
+ }
return db;
}
+
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list )
{
if (do_list.is_empty() && ignore_list.is_empty())
return 1; // ok to replicate if the user puts no constraints
- // if the user has specified restrictions on which databases to replicate
- // and db was not selected, do not replicate
- if(!db)
+ /*
+ If the user has specified restrictions on which databases to replicate
+ and db was not selected, do not replicate.
+ */
+ if (!db)
return 0;
- if(!do_list.is_empty()) // if the do's are not empty
- {
- I_List_iterator<i_string> it(do_list);
- i_string* tmp;
+ if (!do_list.is_empty()) // if the do's are not empty
+ {
+ I_List_iterator<i_string> it(do_list);
+ i_string* tmp;
- while((tmp=it++))
- {
- if(!strcmp(tmp->ptr, db))
- return 1; // match
- }
- return 0;
+ while ((tmp=it++))
+ {
+ if (!strcmp(tmp->ptr, db))
+ return 1; // match
}
+ return 0;
+ }
else // there are some elements in the don't, otherwise we cannot get here
- {
- I_List_iterator<i_string> it(ignore_list);
- i_string* tmp;
+ {
+ I_List_iterator<i_string> it(ignore_list);
+ i_string* tmp;
- while((tmp=it++))
- {
- if(!strcmp(tmp->ptr, db))
- return 0; // match
- }
-
- return 1;
+ while ((tmp=it++))
+ {
+ if (!strcmp(tmp->ptr, db))
+ return 0; // match
}
+
+ return 1;
+ }
}
-static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
- char* default_val)
+static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
+ const char *default_val)
{
uint length;
if ((length=my_b_gets(f,var, max_size)))
@@ -739,10 +763,12 @@ static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
*last_p = 0; // if we stopped on newline, kill it
else
{
- // if we truncated a line or stopped on last char, remove all chars
- // up to and including newline
+ /*
+ If we truncated a line or stopped on last char, remove all chars
+ up to and including newline.
+ */
int c;
- while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF));
+ while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
}
return 0;
}
@@ -763,7 +789,7 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
*var = atoi(buf);
return 0;
}
- else if(default_val)
+ else if (default_val)
{
*var = default_val;
return 0;
@@ -796,12 +822,12 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
goto err;
}
- switch (*version)
- {
+ switch (*version) {
case '3':
mi->old_format = 1;
break;
case '4':
+ case '5':
mi->old_format = 0;
break;
default:
@@ -895,9 +921,11 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
check_opt.init();
check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK;
thd->proc_info = "Rebuilding the index on master dump table";
- // we do not want repair() to spam us with messages
- // just send them to the error log, and report the failure in case of
- // problems
+ /*
+ We do not want repair() to spam us with messages
+ just send them to the error log, and report the failure in case of
+ problems.
+ */
save_vio = thd->net.vio;
thd->net.vio = 0;
error=file->repair(thd,&check_opt) != 0;
@@ -978,15 +1006,15 @@ void end_master_info(MASTER_INFO* mi)
int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
{
DBUG_ENTER("init_relay_log_info");
- if (rli->inited)
- DBUG_RETURN(0);
MY_STAT stat_area;
char fname[FN_REFLEN+128];
int info_fd;
const char* msg = 0;
int error = 0;
- fn_format(fname, info_fname,
- mysql_data_home, "", 4+32);
+
+ if (rli->inited)
+ DBUG_RETURN(0);
+ fn_format(fname, info_fname, mysql_data_home, "", 4+32);
pthread_mutex_lock(&rli->data_lock);
info_fd = rli->info_fd;
rli->pending = 0;
@@ -1001,8 +1029,9 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
if (!opt_relay_logname)
{
char tmp[FN_REFLEN];
- /* TODO: The following should be using fn_format(); We just need to
- first change fn_format() to cut the file name if it's too long.
+ /*
+ TODO: The following should be using fn_format(); We just need to
+ first change fn_format() to cut the file name if it's too long.
*/
strmake(tmp,glob_hostname,FN_REFLEN-5);
strmov(strcend(tmp,'.'),"-relay-bin");
@@ -1011,72 +1040,76 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->relay_log.set_index_file_name(opt_relaylog_index_name);
open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin",
LOG_BIN, 1 /* read_append cache */,
- 1 /* no auto events*/);
+ 1 /* no auto events */);
/* if file does not exist */
if (!my_stat(fname, &stat_area, MYF(0)))
{
- // if someone removed the file from underneath our feet, just close
- // the old descriptor and re-create the old file
+ /*
+ If someone removed the file from underneath our feet, just close
+ the old descriptor and re-create the old file
+ */
if (info_fd >= 0)
my_close(info_fd, MYF(MY_WME));
- if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
- || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
- MYF(MY_WME)))
+ if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
+ init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
+ MYF(MY_WME)))
{
- if(info_fd >= 0)
+ if (info_fd >= 0)
my_close(info_fd, MYF(0));
- rli->info_fd=-1;
+ rli->info_fd= -1;
pthread_mutex_unlock(&rli->data_lock);
DBUG_RETURN(1);
}
- if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg))
+ if (init_relay_log_pos(rli,"",BIN_LOG_HEADER_SIZE,0 /*no data mutex*/,
+ &msg))
goto err;
rli->master_log_pos = 0; // uninitialized
rli->info_fd = info_fd;
}
else // file exists
{
- if(info_fd >= 0)
+ if (info_fd >= 0)
reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
- else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
- || init_io_cache(&rli->info_file, info_fd,
- IO_SIZE*2, READ_CACHE, 0L,
- 0, MYF(MY_WME)))
+ else if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
+ init_io_cache(&rli->info_file, info_fd,
+ IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
{
if (info_fd >= 0)
my_close(info_fd, MYF(0));
- rli->info_fd=-1;
+ rli->info_fd= -1;
pthread_mutex_unlock(&rli->data_lock);
DBUG_RETURN(1);
}
rli->info_fd = info_fd;
if (init_strvar_from_file(rli->relay_log_name,
- sizeof(rli->relay_log_name), &rli->info_file,
- (char*)"") ||
+ sizeof(rli->relay_log_name), &rli->info_file,
+ "") ||
init_intvar_from_file((int*)&rli->relay_log_pos,
- &rli->info_file, 4) ||
+ &rli->info_file, BIN_LOG_HEADER_SIZE) ||
init_strvar_from_file(rli->master_log_name,
sizeof(rli->master_log_name), &rli->info_file,
- (char*)"") ||
+ "") ||
init_intvar_from_file((int*)&rli->master_log_pos,
&rli->info_file, 0))
{
msg="Error reading slave log configuration";
goto err;
}
- if (init_relay_log_pos(rli,0 /*log already inited*/,
- 0 /*pos already inited*/,
+ if (init_relay_log_pos(rli,0 /* log already inited */,
+ 0 /* pos already inited */,
0 /* no data lock*/,
&msg))
goto err;
}
- DBUG_ASSERT(rli->relay_log_pos >= 4);
+ DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
rli->inited = 1;
- // now change the cache from READ to WRITE - must do this
- // before flush_relay_log_info
+ /*
+ Now change the cache from READ to WRITE - must do this
+ before flush_relay_log_info
+ */
reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
error=test(flush_relay_log_info(rli));
if (count_relay_log_space(rli))
@@ -1091,7 +1124,7 @@ err:
sql_print_error(msg);
end_io_cache(&rli->info_file);
my_close(info_fd, MYF(0));
- rli->info_fd=-1;
+ rli->info_fd= -1;
pthread_mutex_unlock(&rli->data_lock);
DBUG_RETURN(1);
}
@@ -1135,6 +1168,7 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
DBUG_RETURN(slave_killed);
}
+
static int count_relay_log_space(RELAY_LOG_INFO* rli)
{
LOG_INFO linfo;
@@ -1145,31 +1179,32 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
sql_print_error("Could not find first log while counting relay log space");
DBUG_RETURN(1);
}
- if (add_relay_log(rli,&linfo))
- DBUG_RETURN(1);
- while (!rli->relay_log.find_next_log(&linfo))
+ do
{
if (add_relay_log(rli,&linfo))
DBUG_RETURN(1);
- }
+ } while (!rli->relay_log.find_next_log(&linfo));
DBUG_RETURN(0);
}
+
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname)
{
+ int fd,error;
+ MY_STAT stat_area;
+ char fname[FN_REFLEN+128];
+ const char *msg;
+ DBUG_ENTER("init_master_info");
+
if (mi->inited)
- return 0;
+ DBUG_RETURN(0);
if (init_relay_log_info(&mi->rli, slave_info_fname))
- return 1;
+ DBUG_RETURN(1);
mi->rli.mi = mi;
mi->mysql=0;
mi->file_id=1;
mi->ignore_stop_event=0;
- int fd,error;
- MY_STAT stat_area;
- char fname[FN_REFLEN+128];
- const char *msg;
fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
/*
@@ -1183,23 +1218,19 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
// we do not want any messages if the file does not exist
if (!my_stat(fname, &stat_area, MYF(0)))
{
- // if someone removed the file from underneath our feet, just close
- // the old descriptor and re-create the old file
+ /*
+ if someone removed the file from underneath our feet, just close
+ the old descriptor and re-create the old file
+ */
if (fd >= 0)
my_close(fd, MYF(MY_WME));
- if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
- || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
- MYF(MY_WME)))
- {
- if(fd >= 0)
- my_close(fd, MYF(0));
- mi->fd=-1;
- end_relay_log_info(&mi->rli);
- pthread_mutex_unlock(&mi->data_lock);
- return 1;
- }
+ if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
+ init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
+ MYF(MY_WME)))
+ goto err;
+
mi->master_log_name[0] = 0;
- mi->master_log_pos = 4; // skip magic number
+ mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number
mi->fd = fd;
if (master_host)
@@ -1213,24 +1244,17 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
}
else // file exists
{
- if(fd >= 0)
+ if (fd >= 0)
reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
- else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
- || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
- 0, MYF(MY_WME)))
- {
- if(fd >= 0)
- my_close(fd, MYF(0));
- mi->fd=-1;
- end_relay_log_info(&mi->rli);
- pthread_mutex_unlock(&mi->data_lock);
- return 1;
- }
+ else if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ||
+ init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
+ 0, MYF(MY_WME)))
+ goto err;
mi->fd = fd;
if (init_strvar_from_file(mi->master_log_name,
sizeof(mi->master_log_name), &mi->file,
- (char*)"") ||
+ "") ||
init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4) ||
init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
master_host) ||
@@ -1242,7 +1266,7 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
master_connect_retry))
{
- msg="Error reading master configuration";
+ sql_print_error("Error reading master configuration");
goto err;
}
}
@@ -1252,17 +1276,18 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
error=test(flush_master_info(mi));
pthread_mutex_unlock(&mi->data_lock);
- return error;
+ DBUG_RETURN(error);
err:
- sql_print_error(msg);
- end_io_cache(&mi->file);
end_relay_log_info(&mi->rli);
- DBUG_ASSERT(fd>=0);
- my_close(fd, MYF(0));
- mi->fd=-1;
+ if (fd >= 0)
+ {
+ my_close(fd, MYF(0));
+ end_io_cache(&mi->file);
+ }
+ mi->fd= -1;
pthread_mutex_unlock(&mi->data_lock);
- return 1;
+ DBUG_RETURN(1);
}
int register_slave_on_master(MYSQL* mysql)
@@ -1282,7 +1307,7 @@ int register_slave_on_master(MYSQL* mysql)
else
packet.append((char)0);
- if(report_password)
+ if (report_password)
net_store_data(&packet, report_user);
else
packet.append((char)0);
@@ -1333,7 +1358,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
field_list.push_back(new Item_empty_string("Skip_counter", 12));
field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
field_list.push_back(new Item_empty_string("Relay_log_space", 12));
- if(send_fields(thd, field_list, 1))
+ if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
String* packet = &thd->packet;
@@ -1502,8 +1527,10 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
*/
thr_alarm(&alarmed, 2 * nap_time,&alarm_buff);
sleep(nap_time);
- // if we wake up before the alarm goes off, hit the button
- // so it will not wake up the wife and kids :-)
+ /*
+ If we wake up before the alarm goes off, hit the button
+ so it will not wake up the wife and kids :-)
+ */
if (thr_alarm_in_use(&alarmed))
thr_end_alarm(&alarmed);
@@ -1528,9 +1555,11 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
memcpy(buf + 10, logname,len);
if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
{
- // something went wrong, so we will just reconnect and retry later
- // in the future, we should do a better error analysis, but for
- // now we just fill up the error log :-)
+ /*
+ Something went wrong, so we will just reconnect and retry later
+ in the future, we should do a better error analysis, but for
+ now we just fill up the error log :-)
+ */
sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs",
mc_mysql_error(mysql), master_connect_retry);
return 1;
@@ -1545,7 +1574,7 @@ static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
char * p = buf;
uint table_len = (uint) strlen(table);
uint db_len = (uint) strlen(db);
- if(table_len + db_len > sizeof(buf) - 2)
+ if (table_len + db_len > sizeof(buf) - 2)
{
sql_print_error("request_table_dump: Buffer overrun");
return 1;
@@ -1571,8 +1600,10 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
{
ulong len = packet_error;
- // my_real_read() will time us out
- // we check if we were told to die, and if not, try reading again
+ /*
+ my_real_read() will time us out
+ We check if we were told to die, and if not, try reading again
+ */
#ifndef DBUG_OFF
if (disconnect_slave_event_count && !(events_till_disconnect--))
return packet_error;
@@ -1643,12 +1674,14 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
type_code != STOP_EVENT ? ev->log_pos : LL(0),
1/* skip lock*/);
flush_relay_log_info(rli);
- if (rli->slave_skip_counter && /* protect against common user error of
- setting the counter to 1 instead of 2
- while recovering from an failed
- auto-increment insert */
- !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
- rli->slave_skip_counter == 1))
+
+ /*
+ Protect against common user error of setting the counter to 1
+ instead of 2 while recovering from an failed auto-increment insert
+ */
+ if (rli->slave_skip_counter &&
+ !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
+ rli->slave_skip_counter == 1))
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
delete ev;
@@ -1718,7 +1751,7 @@ slave_begin:
pthread_cond_broadcast(&mi->start_cond);
pthread_mutex_unlock(&mi->run_lock);
- DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
+ DBUG_PRINT("info",("master info: log_file_name='%s', position=%s",
mi->master_log_name, llstr(mi->master_log_pos,llbuff)));
if (!(mi->mysql = mysql = mc_mysql_init(NULL)))
@@ -1768,7 +1801,7 @@ connected:
if (request_dump(mysql, mi))
{
sql_print_error("Failed on request_dump()");
- if(io_slave_killed(thd,mi))
+ if (io_slave_killed(thd,mi))
{
sql_print_error("Slave I/O thread killed while requesting master \
dump");
@@ -1855,7 +1888,7 @@ reconnect done to recover from failed read");
goto err;
}
goto connected;
- } // if(event_len == packet_error)
+ } // if (event_len == packet_error)
thd->proc_info = "Queueing event from master";
if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
@@ -1909,11 +1942,11 @@ err:
THD_CHECK_SENTRY(thd);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
- my_thread_end(); // clean-up before broadcast
- pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
+ my_thread_end(); // clean-up before broadcast
+ pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
pthread_mutex_unlock(&mi->run_lock);
#ifndef DBUG_OFF
- if(abort_slave_event_count && !events_till_abort)
+ if (abort_slave_event_count && !events_till_abort)
goto slave_begin;
#endif
pthread_exit(0);
@@ -1970,22 +2003,22 @@ slave_begin:
rli->abort_slave = 0;
pthread_cond_broadcast(&rli->start_cond);
pthread_mutex_unlock(&rli->run_lock);
- rli->pending = 0; //this should always be set to 0 when the slave thread
- // is started
- if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg))
+ // This should always be set to 0 when the slave thread is started
+ rli->pending = 0;
+ if (init_relay_log_pos(rli,0,0,1 /*need data lock*/, &errmsg))
{
sql_print_error("Error initializing relay log position: %s",
errmsg);
goto err;
}
- DBUG_ASSERT(rli->relay_log_pos >= 4);
+ DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
- DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
+ DBUG_PRINT("info",("master info: log_file_name: %s, position: %s",
rli->master_log_name, llstr(rli->master_log_pos,llbuff)));
DBUG_ASSERT(rli->sql_thd == thd);
sql_print_error("Slave SQL thread initialized, starting replication in \
-log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
+log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
llstr(rli->relay_log_pos,llbuff1));
while (!sql_slave_killed(thd,rli))
@@ -2004,7 +2037,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
goto err;
}
- } // while(!sql_slave_killed(thd,rli)) - read/exec loop
+ } // while (!sql_slave_killed(thd,rli)) - read/exec loop
// error = 0;
err:
@@ -2053,16 +2086,17 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
bool cev_not_written;
THD* thd;
NET* net = &mi->mysql->net;
+ DBUG_ENTER("process_io_create_file");
if (unlikely(!cev->is_valid()))
- return 1;
+ DBUG_RETURN(1);
/*
TODO: fix to honor table rules, not only db rules
*/
if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
{
skip_load_data_infile(net);
- return 0;
+ DBUG_RETURN(0);
}
DBUG_ASSERT(cev->inited_from_old);
thd = mi->io_thd;
@@ -2137,10 +2171,13 @@ relay log");
}
error=0;
err:
- return error;
+ DBUG_RETURN(error);
}
-// We assume we already locked mi->data_lock
+/*
+ We assume we already locked mi->data_lock
+*/
+
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev)
{
if (unlikely(!rev->is_valid()))
@@ -2175,6 +2212,8 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
bool inc_pos = 1;
bool processed_stop_event = 0;
char* tmp_buf = 0;
+ DBUG_ENTER("queue_old_event");
+
/* if we get Load event, we need to pass a non-reusable buffer
to read_log_event, so we do a trick
*/
@@ -2183,7 +2222,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
{
sql_print_error("Slave I/O: out of memory for Load event");
- return 1;
+ DBUG_RETURN(1);
}
memcpy(tmp_buf,buf,event_len);
tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
@@ -2196,8 +2235,8 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
- my_free((char*)tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
- return 1;
+ my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_RETURN(1);
}
pthread_mutex_lock(&mi->data_lock);
ev->log_pos = mi->master_log_pos;
@@ -2208,7 +2247,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
delete ev;
pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
- return 1;
+ DBUG_RETURN(1);
}
mi->ignore_stop_event=1;
inc_pos = 0;
@@ -2224,7 +2263,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(tmp_buf);
my_free((char*)tmp_buf, MYF(0));
- return error;
+ DBUG_RETURN(error);
}
default:
mi->ignore_stop_event=0;
@@ -2237,7 +2276,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
delete ev;
pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
- return 1;
+ DBUG_RETURN(1);
}
mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
}
@@ -2248,7 +2287,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
mi->ignore_stop_event=1;
pthread_mutex_unlock(&mi->data_lock);
DBUG_ASSERT(!tmp_buf);
- return 0;
+ DBUG_RETURN(0);
}
/*
@@ -2261,8 +2300,10 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
int error=0;
bool inc_pos = 1;
bool processed_stop_event = 0;
+ DBUG_ENTER("queue_event");
+
if (mi->old_format)
- return queue_old_event(mi,buf,event_len);
+ DBUG_RETURN(queue_old_event(mi,buf,event_len));
pthread_mutex_lock(&mi->data_lock);
@@ -2278,7 +2319,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
{
Rotate_log_event rev(buf,event_len,0);
if (unlikely(process_io_rotate(mi,&rev)))
- return 1;
+ DBUG_RETURN(1);
inc_pos=0;
mi->ignore_stop_event=1;
break;
@@ -2298,7 +2339,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
if (unlikely(processed_stop_event))
mi->ignore_stop_event=1;
pthread_mutex_unlock(&mi->data_lock);
- return error;
+ DBUG_RETURN(error);
}
@@ -2425,18 +2466,27 @@ int flush_relay_log_info(RELAY_LOG_INFO* rli)
return 0;
}
-IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg)
+
+/*
+ This function is called when we notice that the current "hot" log
+ got rotated under our feet.
+*/
+
+static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
{
DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
- IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf;
DBUG_ASSERT(rli->cur_log_fd == -1);
+ DBUG_ENTER("reopen_relay_log");
+
+ IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
errmsg)) <0)
- return 0;
+ DBUG_RETURN(0);
my_b_seek(cur_log,rli->relay_log_pos);
- return cur_log;
+ DBUG_RETURN(cur_log);
}
+
Log_event* next_event(RELAY_LOG_INFO* rli)
{
Log_event* ev;
@@ -2445,6 +2495,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
const char* errmsg=0;
THD* thd = rli->sql_thd;
bool was_killed;
+ DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0);
/*
@@ -2456,16 +2507,18 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
*/
pthread_mutex_lock(&rli->data_lock);
- for (; !(was_killed=sql_slave_killed(thd,rli)) ;)
+ while (!(was_killed=sql_slave_killed(thd,rli)))
{
/*
We can have two kinds of log reading:
- hot_log - rli->cur_log points at the IO_CACHE of relay_log, which
- is actively being updated by the I/O thread. We need to be careful
- in this case and make sure that we are not looking at a stale log that
- has already been rotated. If it has been, we reopen the log
- the other case is much simpler - we just have a read only log that
- nobody else will be updating.
+ hot_log:
+ rli->cur_log points at the IO_CACHE of relay_log, which
+ is actively being updated by the I/O thread. We need to be careful
+ in this case and make sure that we are not looking at a stale log that
+ has already been rotated. If it has been, we reopen the log.
+
+ The other case is much simpler:
+ We just have a read only log that nobody else will be updating.
*/
bool hot_log;
if ((hot_log = (cur_log != &rli->cache_buf)))
@@ -2474,43 +2527,43 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
pthread_mutex_lock(log_lock);
/*
- Reading cur_log->init_count here is safe because the log will only
+ Reading xxx_file_id is safe because the log will only
be rotated when we hold relay_log.LOCK_log
*/
- if (cur_log->init_count != rli->cur_log_init_count)
+ if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
{
- if (!(cur_log=reopen_relay_log(rli,&errmsg)))
- {
- pthread_mutex_unlock(log_lock);
+ // The master has switched to a new log file; Reopen the old log file
+ cur_log=reopen_relay_log(rli, &errmsg);
+ pthread_mutex_unlock(log_lock);
+ if (!cur_log) // No more log files
goto err;
- }
- pthread_mutex_unlock(log_lock);
- hot_log=0;
+ hot_log=0; // Using old binary log
}
}
- DBUG_ASSERT(my_b_tell(cur_log) >= 4);
+ DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
- /* relay log is always in new format - if the master is 3.23, the
- I/O thread will convert the format for us
+ /*
+ Relay log is always in new format - if the master is 3.23, the
+ I/O thread will convert the format for us
*/
- if ((ev=Log_event::read_log_event(cur_log,0,(bool)0/*new format*/)))
+ if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
{
DBUG_ASSERT(thd==rli->sql_thd);
if (hot_log)
pthread_mutex_unlock(log_lock);
pthread_mutex_unlock(&rli->data_lock);
- return ev;
+ DBUG_RETURN(ev);
}
DBUG_ASSERT(thd==rli->sql_thd);
- if (opt_reckless_slave)
+ if (opt_reckless_slave) // For mysql-test
cur_log->error = 0;
- if ( cur_log->error < 0)
+ if (cur_log->error < 0)
{
errmsg = "slave SQL thread aborted because of I/O error";
+ if (hot_log)
+ pthread_mutex_unlock(log_lock);
goto err;
}
-
-
if (!cur_log->error) /* EOF */
{
/*
@@ -2520,7 +2573,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
*/
if (hot_log)
{
- DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count);
+ DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count);
/*
We can, and should release data_lock while we are waiting for
update. If we do not, show slave status will block
@@ -2528,7 +2581,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
pthread_mutex_unlock(&rli->data_lock);
/*
- IMPORTANT: note that wait_for_update will unlock LOCK_log, but
+ IMPORTANT: note that wait_for_update will unlock lock_log, but
expects the caller to lock it
*/
rli->relay_log.wait_for_update(rli->sql_thd);
@@ -2537,102 +2590,108 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
pthread_mutex_lock(&rli->data_lock);
continue;
}
+ /*
+ If the log was not hot, we need to move to the next log in
+ sequence. The next log could be hot or cold, we deal with both
+ cases separately after doing some common initialization
+ */
+ end_io_cache(cur_log);
+ DBUG_ASSERT(rli->cur_log_fd >= 0);
+ my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+
+ /*
+ TODO: make skip_log_purge a start-up option. At this point this
+ is not critical priority
+ */
+ if (!rli->skip_log_purge)
+ {
+ // purge_first_log will properly set up relay log coordinates in rli
+ if (rli->relay_log.purge_first_log(rli))
+ {
+ errmsg = "Error purging processed log";
+ goto err;
+ }
+ }
else
{
/*
- If the log was not hot, we need to move to the next log in
- sequence. The next log could be hot or cold, we deal with both
- cases separately after doing some common initialization
+ TODO: verify that no lock is ok here. At this point, if we
+ get this wrong, this is actually no big deal - the only time
+ this code will ever be executed is if we are recovering from
+ a bug when a full reload of the slave is not feasible or
+ desirable.
*/
- end_io_cache(cur_log);
- DBUG_ASSERT(rli->cur_log_fd >= 0);
- my_close(rli->cur_log_fd, MYF(MY_WME));
- rli->cur_log_fd = -1;
-
- // TODO: make skip_log_purge a start-up option. At this point this
- // is not critical priority
- if (!rli->skip_log_purge)
- {
- // purge_first_log will properly set up relay log coordinates in rli
- if (rli->relay_log.purge_first_log(rli))
- {
- errmsg = "Error purging processed log";
- goto err;
- }
- }
- else
+ if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/))
{
- // TODO: verify that no lock is ok here. At this point, if we
- // get this wrong, this is actually no big deal - the only time
- // this code will ever be executed is if we are recovering from
- // a bug when a full reload of the slave is not feasible or
- // desirable.
- if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/))
- {
- errmsg = "error switching to the next log";
- goto err;
- }
- rli->relay_log_pos = 4;
- rli->pending=0;
- strnmov(rli->relay_log_name,rli->linfo.log_file_name,
- sizeof(rli->relay_log_name));
- flush_relay_log_info(rli);
+ errmsg = "error switching to the next log";
+ goto err;
}
+ rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
+ rli->pending=0;
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ flush_relay_log_info(rli);
+ }
- // next log is hot
- if (rli->relay_log.is_active(rli->linfo.log_file_name))
- {
+ // next log is hot
+ if (rli->relay_log.is_active(rli->linfo.log_file_name))
+ {
#ifdef EXTRA_DEBUG
- sql_print_error("next log '%s' is currently active",
- rli->linfo.log_file_name);
+ sql_print_error("next log '%s' is currently active",
+ rli->linfo.log_file_name);
#endif
- rli->cur_log = cur_log = rli->relay_log.get_log_file();
- rli->cur_log_init_count = cur_log->init_count;
- DBUG_ASSERT(rli->cur_log_fd == -1);
+ rli->cur_log= cur_log= rli->relay_log.get_log_file();
+ rli->cur_log_old_open_count= rli->relay_log.get_open_count();
+ DBUG_ASSERT(rli->cur_log_fd == -1);
- /*
- Read pointer has to be at the start since we are the only
- reader
- */
- if (check_binlog_magic(cur_log,&errmsg))
- goto err;
- continue;
- }
/*
- if we get here, the log was not hot, so we will have to
- open it ourselves
+ Read pointer has to be at the start since we are the only
+ reader
*/
-#ifdef EXTRA_DEBUG
- sql_print_error("next log '%s' is not active",
- rli->linfo.log_file_name);
-#endif
- // open_binlog() will check the magic header
- if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
- &errmsg)) <0)
+ if (check_binlog_magic(cur_log,&errmsg))
goto err;
+ continue;
}
+ /*
+ if we get here, the log was not hot, so we will have to
+ open it ourselves
+ */
+#ifdef EXTRA_DEBUG
+ sql_print_error("next log '%s' is not active",
+ rli->linfo.log_file_name);
+#endif
+ // open_binlog() will check the magic header
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
+ &errmsg)) <0)
+ goto err;
}
- else // read failed with a non-EOF error
+ else
{
- // TODO: come up with something better to handle this error
+ /*
+ Read failed with a non-EOF error.
+ TODO: come up with something better to handle this error
+ */
+ if (hot_log)
+ pthread_mutex_unlock(log_lock);
sql_print_error("Slave SQL thread: I/O error reading \
-event(errno=%d,cur_log->error=%d)",
+event(errno: %d cur_log->error: %d)",
my_errno,cur_log->error);
// set read position to the beginning of the event
my_b_seek(cur_log,rli->relay_log_pos+rli->pending);
/* otherwise, we have had a partial read */
- /* TODO; see if there is a way to do this without this goto */
errmsg = "Aborting slave SQL thread because of partial event read";
+ /* TODO; see if there is a way to do this without this goto */
goto err;
}
-
}
if (!errmsg && was_killed)
errmsg = "slave SQL thread was killed";
+
err:
pthread_mutex_unlock(&rli->data_lock);
sql_print_error("Error reading relay log event: %s", errmsg);
- return 0;
+ DBUG_RETURN(0);
}
diff --git a/sql/slave.h b/sql/slave.h
index 16735891815..d6992a8b839 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -84,18 +84,9 @@ typedef struct st_relay_log_info
volatile my_off_t master_log_pos;
/*
- current offset in the relay log.
- pending - in some cases we do not increment offset immediately after
- processing an event, because the following event needs to be processed
- atomically together with this one ( so far, there is only one type of
- such event - Intvar_event that sets auto_increment value). However, once
- both events have been processed, we need to increment by the cumulative
- offset. pending stored the extra offset to be added to the position.
+ Protected with internal locks.
+ Must get data_lock when resetting the logs.
*/
- ulonglong relay_log_pos, pending;
-
- // protected with internal locks
- // must get data_lock when resetting the logs
MYSQL_LOG relay_log;
LOG_INFO linfo;
IO_CACHE cache_buf,*cur_log;
@@ -125,9 +116,6 @@ typedef struct st_relay_log_info
*/
pthread_cond_t start_cond, stop_cond, data_cond;
- // if not set, the value of other members of the structure are undefined
- bool inited;
-
// parent master info structure
struct st_master_info *mi;
@@ -135,9 +123,19 @@ typedef struct st_relay_log_info
Needed to deal properly with cur_log getting closed and re-opened with
a different log under our feet
*/
- int cur_log_init_count;
+ uint32 cur_log_old_open_count;
- volatile bool abort_slave, slave_running;
+ /*
+ current offset in the relay log.
+ pending - in some cases we do not increment offset immediately after
+ processing an event, because the following event needs to be processed
+ atomically together with this one ( so far, there is only one type of
+ such event - Intvar_event that sets auto_increment value). However, once
+ both events have been processed, we need to increment by the cumulative
+ offset. pending stored the extra offset to be added to the position.
+ */
+ ulonglong relay_log_pos, pending;
+ ulonglong log_space_limit,log_space_total;
/*
Needed for problems when slave stops and we want to restart it
@@ -145,45 +143,47 @@ typedef struct st_relay_log_info
errors, and have been manually applied by DBA already.
*/
volatile uint32 slave_skip_counter;
+ pthread_mutex_t log_space_lock;
+ pthread_cond_t log_space_cond;
+ THD * sql_thd;
+ int last_slave_errno;
#ifndef DBUG_OFF
int events_till_abort;
#endif
- int last_slave_errno;
char last_slave_error[MAX_SLAVE_ERRMSG];
- THD* sql_thd;
+
+ // if not set, the value of other members of the structure are undefined
+ bool inited;
+ volatile bool abort_slave, slave_running;
bool log_pos_current;
bool abort_pos_wait;
bool skip_log_purge;
- ulonglong log_space_limit,log_space_total;
- pthread_mutex_t log_space_lock;
- pthread_cond_t log_space_cond;
- st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0),
- cur_log_init_count(0),
- abort_slave(0),slave_running(0),
- log_pos_current(0),abort_pos_wait(0),
- skip_log_purge(0)
- {
- relay_log_name[0] = master_log_name[0] = 0;
- bzero(&info_file,sizeof(info_file));
- pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&data_cond, NULL);
- pthread_cond_init(&start_cond, NULL);
- pthread_cond_init(&stop_cond, NULL);
- pthread_cond_init(&log_space_cond, NULL);
- }
+ st_relay_log_info()
+ :info_fd(-1),cur_log_fd(-1), cur_log_old_open_count(0),
+ inited(0), abort_slave(0), slave_running(0), log_pos_current(0),
+ abort_pos_wait(0), skip_log_purge(0)
+ {
+ relay_log_name[0] = master_log_name[0] = 0;
+ bzero(&info_file,sizeof(info_file));
+ pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&data_cond, NULL);
+ pthread_cond_init(&start_cond, NULL);
+ pthread_cond_init(&stop_cond, NULL);
+ pthread_cond_init(&log_space_cond, NULL);
+ }
~st_relay_log_info()
- {
- pthread_mutex_destroy(&run_lock);
- pthread_mutex_destroy(&data_lock);
- pthread_mutex_destroy(&log_space_lock);
- pthread_cond_destroy(&data_cond);
- pthread_cond_destroy(&start_cond);
- pthread_cond_destroy(&stop_cond);
- pthread_cond_destroy(&log_space_cond);
- }
+ {
+ pthread_mutex_destroy(&run_lock);
+ pthread_mutex_destroy(&data_lock);
+ pthread_mutex_destroy(&log_space_lock);
+ pthread_cond_destroy(&data_cond);
+ pthread_cond_destroy(&start_cond);
+ pthread_cond_destroy(&stop_cond);
+ pthread_cond_destroy(&log_space_cond);
+ }
inline void inc_pending(ulonglong val)
{
pending += val;
@@ -215,40 +215,33 @@ typedef struct st_relay_log_info
int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos);
} RELAY_LOG_INFO;
-/*
- repopen_relay_log() is called when we notice that the current "hot" log
- got rotated under our feet
-*/
-
-IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg);
Log_event* next_event(RELAY_LOG_INFO* rli);
-
/*
st_master_info contains information about how to connect to a master,
- current master log name, and current log offset, as well as misc
- control variables
+ current master log name, and current log offset, as well as misc
+ control variables
- st_master_info is initialized once from the master.info file if such
- exists. Otherwise, data members corresponding to master.info fields are
- initialized with defaults specified by master-* options. The initialization
- is done through init_master_info() call.
+ st_master_info is initialized once from the master.info file if such
+ exists. Otherwise, data members corresponding to master.info fields
+ are initialized with defaults specified by master-* options. The
+ initialization is done through init_master_info() call.
- The format of master.info file:
+ The format of master.info file:
- log_name
- log_pos
- master_host
- master_user
- master_pass
- master_port
- master_connect_retry
+ log_name
+ log_pos
+ master_host
+ master_user
+ master_pass
+ master_port
+ master_connect_retry
- To write out the contents of master.info file to disk ( needed every
- time we read and queue data from the master ), a call to
- flush_master_info() is required.
+ To write out the contents of master.info file to disk ( needed every
+ time we read and queue data from the master ), a call to
+ flush_master_info() is required.
- To clean up, call end_master_info()
+ To clean up, call end_master_info()
*/
diff --git a/sql/sql_class.h b/sql/sql_class.h
index fe6a7e2ed69..da0b2090c97 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -60,48 +60,49 @@ class Log_event;
class MYSQL_LOG {
private:
pthread_mutex_t LOCK_log, LOCK_index;
+ pthread_cond_t update_cond;
+ ulonglong bytes_written;
time_t last_time,query_start;
IO_CACHE log_file;
File index_file;
char *name;
- volatile enum_log_type log_type;
char time_buff[20],db[NAME_LEN+1];
char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN];
- bool write_error,inited;
- uint file_id; // current file sequence number for load data infile
- // binary logging
- bool no_rotate; // for binlog - if log name can never change
- // we should not try to rotate it or write any rotation events
- // the user should use FLUSH MASTER instead of FLUSH LOGS for
- // purging
+ // current file sequence number for load data infile binary logging
+ uint file_id;
+ uint open_count; // For replication
+ /*
+ For binlog - if log name can never change we should not try to rotate it
+ or write any rotation events. The user should use FLUSH MASTER instead
+ of FLUSH LOGS for purging.
+ */
+ volatile enum_log_type log_type;
enum cache_type io_cache_type;
+ bool write_error,inited;
+ bool no_rotate;
bool need_start_event;
- pthread_cond_t update_cond;
bool no_auto_events; // for relay binlog
- ulonglong bytes_written;
friend class Log_event;
public:
MYSQL_LOG();
~MYSQL_LOG();
- pthread_mutex_t* get_log_lock() { return &LOCK_log; }
void reset_bytes_written()
- {
- bytes_written = 0;
- }
+ {
+ bytes_written = 0;
+ }
void harvest_bytes_written(ulonglong* counter)
- {
+ {
#ifndef DBUG_OFF
- char buf1[22],buf2[22];
+ char buf1[22],buf2[22];
#endif
- DBUG_ENTER("harvest_bytes_written");
- (*counter)+=bytes_written;
- DBUG_PRINT("info",("counter=%s,bytes_written=%s", llstr(*counter,buf1),
- llstr(bytes_written,buf2)));
- bytes_written=0;
- DBUG_VOID_RETURN;
- }
- IO_CACHE* get_log_file() { return &log_file; }
+ DBUG_ENTER("harvest_bytes_written");
+ (*counter)+=bytes_written;
+ DBUG_PRINT("info",("counter: %s bytes_written: %s", llstr(*counter,buf1),
+ llstr(bytes_written,buf2)));
+ bytes_written=0;
+ DBUG_VOID_RETURN;
+ }
void signal_update() { pthread_cond_broadcast(&update_cond);}
void wait_for_update(THD* thd);
void set_need_start_event() { need_start_event = 1; }
@@ -135,8 +136,8 @@ public:
int purge_logs(THD* thd, const char* to_log);
int purge_first_log(struct st_relay_log_info* rli);
int reset_logs(THD* thd);
- void close(bool exiting = 0); // if we are exiting, we also want to close the
- // index file
+ // if we are exiting, we also want to close the index file
+ void close(bool exiting = 0);
// iterating through the log index file
int find_first_log(LOG_INFO* linfo, const char* log_name,
@@ -146,11 +147,15 @@ public:
uint next_file_id();
inline bool is_open() { return log_type != LOG_CLOSED; }
- char* get_index_fname() { return index_file_name;}
- char* get_log_fname() { return log_file_name; }
- void lock_index() { pthread_mutex_lock(&LOCK_index);}
- void unlock_index() { pthread_mutex_unlock(&LOCK_index);}
- File get_index_file() { return index_file;}
+ inline char* get_index_fname() { return index_file_name;}
+ inline char* get_log_fname() { return log_file_name; }
+ inline pthread_mutex_t* get_log_lock() { return &LOCK_log; }
+ inline IO_CACHE* get_log_file() { return &log_file; }
+
+ inline void lock_index() { pthread_mutex_lock(&LOCK_index);}
+ inline void unlock_index() { pthread_mutex_unlock(&LOCK_index);}
+ inline File get_index_file() { return index_file;}
+ inline uint32 get_open_count() { return open_count; }
};
/* character conversion tables */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 6253058549e..a92622a59b1 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -2891,6 +2891,7 @@ bool add_field_to_list(char *field_name, enum_field_types type,
case FIELD_TYPE_STRING:
case FIELD_TYPE_VAR_STRING:
case FIELD_TYPE_NULL:
+ case FIELD_TYPE_GEOMETRY:
break;
case FIELD_TYPE_DECIMAL:
if (!length)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index b6c7c98a4cf..415007b38fa 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -154,6 +154,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg)
{
File file;
+ DBUG_ENTER("open_binlog");
if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 ||
init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
@@ -164,7 +165,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
}
if (check_binlog_magic(log,errmsg))
goto err;
- return file;
+ DBUG_RETURN(file);
err:
if (file >= 0)
@@ -172,7 +173,7 @@ err:
my_close(file,MYF(0));
end_io_cache(log);
}
- return -1;
+ DBUG_RETURN(-1);
}
@@ -628,7 +629,8 @@ int reset_slave(MASTER_INFO* mi)
char fname[FN_REFLEN];
int restart_thread_mask = 0,error=0;
const char* errmsg=0;
-
+ DBUG_ENTER("reset_slave");
+
lock_slave_threads(mi);
init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */);
if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/))
@@ -649,14 +651,14 @@ int reset_slave(MASTER_INFO* mi)
goto err;
}
if (restart_thread_mask)
- error=start_slave_threads(0 /* mutex not needed*/,
- 1 /* wait for start*/,
- mi,master_info_file,relay_log_info_file,
- restart_thread_mask);
+ error=start_slave_threads(0 /* mutex not needed */,
+ 1 /* wait for start*/,
+ mi,master_info_file,relay_log_info_file,
+ restart_thread_mask);
// TODO: fix error messages so they get to the client
err:
unlock_slave_threads(mi);
- return error;
+ DBUG_RETURN(error);
}
void kill_zombie_dump_threads(uint32 slave_server_id)
diff --git a/sql/unireg.h b/sql/unireg.h
index 5a61f4a6c12..e8fdd5dd5dd 100644
--- a/sql/unireg.h
+++ b/sql/unireg.h
@@ -129,6 +129,10 @@ bfill((A)->null_flags,(A)->null_bytes,255);\
*/
#define MIN_TURBOBM_PATTERN_LEN 3
+/* Defines for binary logging */
+
+#define BIN_LOG_HEADER_SIZE 4
+
/* Include prototypes for unireg */
#include "mysqld_error.h"