diff options
author | unknown <monty@hundin.mysql.fi> | 2002-06-05 23:04:38 +0300 |
---|---|---|
committer | unknown <monty@hundin.mysql.fi> | 2002-06-05 23:04:38 +0300 |
commit | 03728196ee76cfb4bce8923ec25687f8bbd495cb (patch) | |
tree | 9260753468997c0d4b3b8c78aea517507e2791eb | |
parent | ef06010563093f231d62896c6af9e128142fbd56 (diff) | |
download | mariadb-git-03728196ee76cfb4bce8923ec25687f8bbd495cb.tar.gz |
removed init_count from IO_CACHE.
Added missing mutex_unlock to slave replication code.
include/my_sys.h:
removed init_count from IO_CACHE.
General cleanup.
innobase/srv/srv0srv.c:
Initailize slots to avoid purify warnings.
Removed some compiler warnings.
mysql-test/mysql-test-run.sh:
Automatic start of slave under gdb
mysys/mf_iocache.c:
removed init_count
sql/field.cc:
Cleanup
sql/log.cc:
Cleanup
added open_count variable.
sql/log_event.cc:
cleanup
use is_prefix instead of memcmp()
sql/repl_failsafe.cc:
cleanup
sql/slave.cc:
cleanup
use MYSQL_LOG->open_count instead of IO_CACHE->init_count
Added missing mutex_unlock()
sql/slave.h:
cleanup
sql/sql_class.h:
cleanup
Added open_count to MYSQL_LOGL
sql/sql_parse.cc:
removed compiler warning
sql/sql_repl.cc:
added DBUG_xxx
sql/unireg.h:
Added BIN_LOG_HEADER_SIZE
-rw-r--r-- | include/my_sys.h | 133 | ||||
-rw-r--r-- | innobase/srv/srv0srv.c | 14 | ||||
-rw-r--r-- | mysql-test/mysql-test-run.sh | 20 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 2 | ||||
-rw-r--r-- | sql/field.cc | 3 | ||||
-rw-r--r-- | sql/log.cc | 77 | ||||
-rw-r--r-- | sql/log_event.cc | 247 | ||||
-rw-r--r-- | sql/repl_failsafe.cc | 30 | ||||
-rw-r--r-- | sql/slave.cc | 673 | ||||
-rw-r--r-- | sql/slave.h | 135 | ||||
-rw-r--r-- | sql/sql_class.h | 67 | ||||
-rw-r--r-- | sql/sql_parse.cc | 1 | ||||
-rw-r--r-- | sql/sql_repl.cc | 18 | ||||
-rw-r--r-- | sql/unireg.h | 4 |
14 files changed, 762 insertions, 662 deletions
diff --git a/include/my_sys.h b/include/my_sys.h index ebc86c27d1f..7ad638bcca8 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -302,33 +302,41 @@ typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); typedef struct st_io_cache /* Used when cacheing files */ { - /* pos_in_file is offset in file corresponding to the first byte of - byte* buffer. end_of_file is the offset of end of file for READ_CACHE - and WRITE_CACHE. For SEQ_READ_APPEND it the maximum of the actual - end of file and the position represented by read_end. + /* Offset in file corresponding to the first byte of byte* buffer. */ + my_off_t pos_in_file; + /* + The offset of end of file for READ_CACHE and WRITE_CACHE. + For SEQ_READ_APPEND it the maximum of the actual end of file and + the position represented by read_end. */ - my_off_t pos_in_file,end_of_file; - /* read_pos points to current read position in the buffer - read_end is the non-inclusive boundary in the buffer for the currently - valid read area - buffer is the read buffer - not sure about request_pos except that it is used in async_io + my_off_t end_of_file; + /* Points to current read position in the buffer */ + byte *read_pos; + /* the non-inclusive boundary in the buffer for the currently valid read */ + byte *read_end; + byte *buffer; /* The read buffer */ + /* Used in ASYNC_IO */ + byte *request_pos; + + /* Only used in WRITE caches and in SEQ_READ_APPEND to buffer writes */ + byte *write_buffer; + /* + Only used in SEQ_READ_APPEND, and points to the current read position + in the write buffer. Note that reads in SEQ_READ_APPEND caches can + happen from both read buffer (byte* buffer) and write buffer + (byte* write_buffer). */ - byte *read_pos,*read_end,*buffer,*request_pos; - /* write_buffer is used only in WRITE caches and in SEQ_READ_APPEND to - buffer writes - append_read_pos is only used in SEQ_READ_APPEND, and points to the - current read position in the write buffer. Note that reads in - SEQ_READ_APPEND caches can happen from both read buffer (byte* buffer), - and write buffer (byte* write_buffer). - write_pos points to current write position in the write buffer and - write_end is the non-inclusive boundary of the valid write area - */ - byte *write_buffer, *append_read_pos, *write_pos, *write_end; - /* current_pos and current_end are convenience variables used by - my_b_tell() and other routines that need to know the current offset - current_pos points to &write_pos, and current_end to &write_end in a - WRITE_CACHE, and &read_pos and &read_end respectively otherwise + byte *append_read_pos; + /* Points to current write position in the write buffer */ + byte *write_pos; + /* The non-inclusive boundary of the valid write area */ + byte *write_end; + + /* + Current_pos and current_end are convenience variables used by + my_b_tell() and other routines that need to know the current offset + current_pos points to &write_pos, and current_end to &write_end in a + WRITE_CACHE, and &read_pos and &read_end respectively otherwise */ byte **current_pos, **current_end; /* The lock is for append buffer used in SEQ_READ_APPEND cache */ @@ -336,70 +344,64 @@ typedef struct st_io_cache /* Used when cacheing files */ pthread_mutex_t append_buffer_lock; /* need mutex copying from append buffer to read buffer */ #endif - /* a caller will use my_b_read() macro to read from the cache - if the data is already in cache, it will be simply copied with - memcpy() and internal variables will be accordinging updated with - no functions invoked. However, if the data is not fully in the cache, - my_b_read() will call read_function to fetch the data. read_function - must never be invoked directly + /* + A caller will use my_b_read() macro to read from the cache + if the data is already in cache, it will be simply copied with + memcpy() and internal variables will be accordinging updated with + no functions invoked. However, if the data is not fully in the cache, + my_b_read() will call read_function to fetch the data. read_function + must never be invoked directly. */ int (*read_function)(struct st_io_cache *,byte *,uint); - /* same idea as in the case of read_function, except my_b_write() needs to - be replaced with my_b_append() for a SEQ_READ_APPEND cache + /* + Same idea as in the case of read_function, except my_b_write() needs to + be replaced with my_b_append() for a SEQ_READ_APPEND cache */ int (*write_function)(struct st_io_cache *,const byte *,uint); - /* specifies the type of the cache. Depending on the type of the cache + /* + Specifies the type of the cache. Depending on the type of the cache certain operations might not be available and yield unpredicatable results. Details to be documented later */ enum cache_type type; - /* callbacks when the actual read I/O happens. These were added and - are currently used for binary logging of LOAD DATA INFILE - when a - block is read from the file, we create a block create/append event, and - when IO_CACHE is closed, we create an end event. These functions could, - of course be used for other things + /* + Callbacks when the actual read I/O happens. These were added and + are currently used for binary logging of LOAD DATA INFILE - when a + block is read from the file, we create a block create/append event, and + when IO_CACHE is closed, we create an end event. These functions could, + of course be used for other things */ IO_CACHE_CALLBACK pre_read; IO_CACHE_CALLBACK post_read; IO_CACHE_CALLBACK pre_close; - void* arg; /* for use by pre/post_read */ + void* arg; /* for use by pre/post_read */ char *file_name; /* if used with 'open_cached_file' */ char *dir,*prefix; File file; /* file descriptor */ - /* seek_not_done is set by my_b_seek() to inform the upcoming read/write - operation that a seek needs to be preformed prior to the actual I/O - error is 0 if the cache operation was successful, -1 if there was a - "hard" error, and the actual number of I/O-ed bytes if the read/write was - partial + /* + seek_not_done is set by my_b_seek() to inform the upcoming read/write + operation that a seek needs to be preformed prior to the actual I/O + error is 0 if the cache operation was successful, -1 if there was a + "hard" error, and the actual number of I/O-ed bytes if the read/write was + partial. */ int seek_not_done,error; - /* buffer_length is the size of memory allocated for buffer or write_buffer - read_length is the same as buffer_length except when we use async io - not sure why we need it - */ - uint buffer_length,read_length; + /* buffer_length is memory size allocated for buffer or write_buffer */ + uint buffer_length; + /* read_length is the same as buffer_length except when we use async io */ + uint read_length; myf myflags; /* Flags used to my_read/my_write */ - /* + /* alloced_buffer is 1 if the buffer was allocated by init_io_cache() and - 0 if it was supplied by the user + 0 if it was supplied by the user. Currently READ_NET is the only one that will use a buffer allocated somewhere else */ my_bool alloced_buffer; - /* init_count is incremented every time we call init_io_cache() - It is not reset in end_io_cache(). This variable - was introduced for slave relay logs - RELAY_LOG_INFO stores a pointer - to IO_CACHE that could in some cases refer to the IO_CACHE of the - currently active relay log. The IO_CACHE then could be closed, - re-opened and start pointing to a different log file. In that case, - we could not know reliably if this happened without init_count - one must be careful with bzero() prior to the subsequent init_io_cache() - call - */ - int init_count; #ifdef HAVE_AIOWAIT - /* as inidicated by ifdef, this is for async I/O, we will have - Sinisa comment this some time + /* + As inidicated by ifdef, this is for async I/O, which is not currently + used (because it's not reliable on all systems) */ uint inited; my_off_t aio_read_pos; @@ -428,7 +430,6 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); ((info)->write_pos+=(Count)),0) : \ (*(info)->write_function)((info),(Buffer),(Count))) - #define my_b_get(info) \ ((info)->read_pos != (info)->read_end ?\ ((info)->read_pos++, (int) (uchar) (info)->read_pos[-1]) :\ diff --git a/innobase/srv/srv0srv.c b/innobase/srv/srv0srv.c index f366ce0d160..39f3566eac8 100644 --- a/innobase/srv/srv0srv.c +++ b/innobase/srv/srv0srv.c @@ -1631,6 +1631,7 @@ srv_init(void) for (i = 0; i < OS_THREAD_MAX_N; i++) { slot = srv_table_get_nth_slot(i); slot->in_use = FALSE; + slot->type=0; /* Avoid purify errors */ slot->event = os_event_create(NULL); ut_a(slot->event); } @@ -1899,7 +1900,6 @@ srv_conc_exit_innodb( trx_t* trx) /* in: transaction object associated with the thread */ { - srv_conc_slot_t* slot = NULL; if (srv_thread_concurrency >= 500) { @@ -2514,11 +2514,11 @@ loop: can drop tables lazily after there no longer are SELECT queries to them. */ - srv_main_thread_op_info = "doing background drop tables"; + srv_main_thread_op_info = (char*) "doing background drop tables"; row_drop_tables_for_mysql_in_background(); - srv_main_thread_op_info = ""; + srv_main_thread_op_info = (char*) ""; if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND) { @@ -2630,19 +2630,19 @@ background_loop: /* In this loop we run background operations when the server is quiet and we also come here about once in 10 seconds */ - srv_main_thread_op_info = "doing background drop tables"; + srv_main_thread_op_info = (char*) "doing background drop tables"; n_tables_to_drop = row_drop_tables_for_mysql_in_background(); - srv_main_thread_op_info = ""; + srv_main_thread_op_info = (char*) ""; - srv_main_thread_op_info = "flushing buffer pool pages"; + srv_main_thread_op_info = (char*) "flushing buffer pool pages"; /* Flush a few oldest pages to make the checkpoint younger */ n_pages_flushed = buf_flush_batch(BUF_FLUSH_LIST, 10, ut_dulint_max); - srv_main_thread_op_info = "making checkpoint"; + srv_main_thread_op_info = (char*) "making checkpoint"; /* Make a new checkpoint about once in 10 seconds */ diff --git a/mysql-test/mysql-test-run.sh b/mysql-test/mysql-test-run.sh index 7ae89c96169..66bc7f5069b 100644 --- a/mysql-test/mysql-test-run.sh +++ b/mysql-test/mysql-test-run.sh @@ -675,9 +675,9 @@ manager_launch() ident=$1 shift if [ $USE_MANAGER = 0 ] ; then - $@ >$CUR_MYERR 2>&1 & - sleep 2 #hack - return + $@ >$CUR_MYERR 2>&1 & + sleep 2 #hack + return fi $MYSQL_MANAGER_CLIENT $MANAGER_QUIET_OPT --user=$MYSQL_MANAGER_USER \ --password=$MYSQL_MANAGER_PW --port=$MYSQL_MANAGER_PORT <<EOF @@ -687,7 +687,7 @@ set_exec_stderr $ident $CUR_MYERR set_exec_con $ident root localhost $CUR_MYSOCK start_exec $ident $START_WAIT_TIMEOUT EOF - abort_if_failed "Could not execute manager command" + abort_if_failed "Could not execute manager command" } manager_term() @@ -887,13 +887,23 @@ start_slave() "gdb -x $GDB_SLAVE_INIT" $SLAVE_MYSQLD elif [ x$DO_GDB = x1 ] then - $ECHO "set args $slave_args" > $GDB_SLAVE_INIT if [ x$MANUAL_GDB = x1 ] then + $ECHO "set args $slave_args" > $GDB_SLAVE_INIT echo "To start gdb for the slave, type in another window:" echo "cd $CWD ; gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD" wait_for_slave=1500 else + ( $ECHO set args $slave_args; + if [ $USE_MANAGER = 0 ] ; then + cat <<EOF +b mysql_parse +commands 1 +disa 1 +end +r +EOF + fi ) > $GDB_SLAVE_INIT manager_launch $slave_ident $XTERM -display $DISPLAY -title "Slave" -e \ gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD fi diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index a0247003a81..34873d107af 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -122,8 +122,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->pos_in_file= seek_offset; info->pre_close = info->pre_read = info->post_read = 0; info->arg = 0; - info->init_count++; /* we assume the user had set it to 0 prior to - first call */ info->alloced_buffer = 0; info->buffer=0; info->seek_not_done= test(file >= 0); diff --git a/sql/field.cc b/sql/field.cc index c6880a29cee..5466facb437 100644 --- a/sql/field.cc +++ b/sql/field.cc @@ -4628,7 +4628,7 @@ bool Field_num::eq_def(Field *field) *****************************************************************************/ /* -** Make a field from the .frm file info + Make a field from the .frm file info */ uint32 calc_pack_length(enum_field_types type,uint32 length) @@ -4657,6 +4657,7 @@ uint32 calc_pack_length(enum_field_types type,uint32 length) case FIELD_TYPE_LONG_BLOB: return 4+portable_sizeof_char_ptr; case FIELD_TYPE_SET: case FIELD_TYPE_ENUM: abort(); return 0; // This shouldn't happen + default: return 0; } return 0; // This shouldn't happen } diff --git a/sql/log.cc b/sql/log.cc index f0012a94f5d..0f8c4a8c4a8 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -80,10 +80,10 @@ static int find_uniq_filename(char *name) DBUG_RETURN(0); } -MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), - name(0), log_type(LOG_CLOSED),write_error(0), - inited(0), file_id(1),no_rotate(0), - need_start_event(1),bytes_written(0) +MYSQL_LOG::MYSQL_LOG() + :bytes_written(0), last_time(0), query_start(0), index_file(-1), name(0), + file_id(1), open_count(1), log_type(LOG_CLOSED), write_error(0), inited(0), + no_rotate(0), need_start_event(1) { /* We don't want to intialize LOCK_Log here as the thread system may @@ -173,8 +173,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, File file= -1; bool do_magic; int open_flags = O_CREAT | O_APPEND | O_BINARY; + DBUG_ENTER("MYSQL_LOG::open"); + if (!inited && log_type_arg == LOG_BIN && *fn_ext(log_name)) - no_rotate = 1; + no_rotate = 1; init(log_type_arg,io_cache_type_arg,no_auto_events_arg); if (!(name=my_strdup(log_name,MYF(MY_WME)))) @@ -196,6 +198,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name, &tmp_stat, MYF(0))); + open_count++; if ((file=my_open(log_file_name,open_flags, MYF(MY_WME | ME_WAITTANG))) < 0 || init_io_cache(&log_file, file, IO_SIZE, io_cache_type, @@ -237,10 +240,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, bool error; if (do_magic) { - if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4) || - open_index(O_APPEND | O_RDWR | O_CREAT)) + if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, BIN_LOG_HEADER_SIZE) || + open_index(O_APPEND | O_RDWR | O_CREAT)) goto err; - bytes_written += 4; + bytes_written += BIN_LOG_HEADER_SIZE; } if (need_start_event && !no_auto_events) @@ -262,7 +265,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, goto err; } } - return; + DBUG_VOID_RETURN; err: sql_print_error("Could not use %s for logging (error %d)", log_name,errno); @@ -271,7 +274,7 @@ err: end_io_cache(&log_file); x_free(name); name=0; log_type=LOG_CLOSED; - return; + DBUG_VOID_RETURN; } int MYSQL_LOG::get_current_log(LOG_INFO* linfo) @@ -284,6 +287,7 @@ int MYSQL_LOG::get_current_log(LOG_INFO* linfo) } // if log_name is "" we stop at the first entry + int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name, bool need_mutex) { @@ -294,8 +298,10 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name, uint log_name_len = (uint) strlen(log_name); IO_CACHE io_cache; - // mutex needed because we need to make sure the file pointer does not move - // from under our feet + /* + Mutex needed because we need to make sure the file pointer does not move + from under our feet + */ if (need_mutex) pthread_mutex_lock(&LOCK_index); if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) 0, @@ -304,7 +310,7 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name, error = LOG_INFO_SEEK; goto err; } - for(;;) + for (;;) { uint length; if (!(length=my_b_gets(&io_cache, fname, FN_REFLEN-1))) @@ -336,9 +342,12 @@ err: int MYSQL_LOG::find_next_log(LOG_INFO* linfo, bool need_lock) { - // mutex needed because we need to make sure the file pointer does not move - // from under our feet - if (index_file < 0) return LOG_INFO_INVALID; + /* + Mutex needed because we need to make sure the file pointer does not move + from under our feet + */ + if (index_file < 0) + return LOG_INFO_INVALID; int error = 0; char* fname = linfo->log_file_name; IO_CACHE io_cache; @@ -382,7 +391,7 @@ int MYSQL_LOG::reset_logs(THD* thd) goto err; } - for(;;) + for (;;) { my_delete(linfo.log_file_name, MYF(MY_WME)); if (find_next_log(&linfo)) @@ -490,7 +499,7 @@ err: rli->linfo.log_file_name); goto err2; } - rli->relay_log_pos = 4; + rli->relay_log_pos = BIN_LOG_HEADER_SIZE; strnmov(rli->relay_log_name,rli->linfo.log_file_name, sizeof(rli->relay_log_name)); flush_relay_log_info(rli); @@ -550,7 +559,7 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) my_off_t init_purge_offset= my_b_tell(&io_cache); if (!(fname_len=my_b_gets(&io_cache, fname, FN_REFLEN))) { - if(!io_cache.error) + if (!io_cache.error) break; error = LOG_INFO_IO; goto err; @@ -993,8 +1002,13 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache) if (is_open()) { + /* + We come here when the queries to be logged could not fit into memory + and part of the queries are stored in a log file on disk. + */ + uint length; - //QQ: this looks like a bug - why READ_CACHE? + /* Read from the file used to cache the queries .*/ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) { sql_print_error(ER(ER_ERROR_ON_WRITE), cache->file_name, errno); @@ -1003,6 +1017,7 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache) length=my_b_bytes_in_cache(cache); do { + /* Write data to the binary log file */ if (my_b_write(&log_file, cache->read_pos, length)) { if (!write_error) @@ -1168,19 +1183,23 @@ void MYSQL_LOG:: wait_for_update(THD* thd) const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log, "Slave: waiting for binlog update"); pthread_cond_wait(&update_cond, &LOCK_log); - // this is not a bug - we unlock the mutex for the caller, and expect him - // to lock it and then not unlock it upon return. This is a rather odd - // way of doing things, but this is the cleanest way I could think of to - // solve the race deadlock caused by THD::awake() first acquiring mysys_var - // mutex and then the current mutex, while wait_for_update being called with - // the current mutex already aquired and THD::exit_cond() trying to acquire - // mysys_var mutex. We do need the mutex to be acquired prior to the - // invocation of wait_for_update in all cases, so mutex acquisition inside - // wait_for_update() is not an option + /* + This is not a bug: + We unlock the mutex for the caller, and expect him to lock it and + then not unlock it upon return. This is a rather odd way of doing + things, but this is the cleanest way I could think of to solve the + race deadlock caused by THD::awake() first acquiring mysys_var + mutex and then the current mutex, while wait_for_update being + called with the current mutex already aquired and THD::exit_cond() + trying to acquire mysys_var mutex. We do need the mutex to be + acquired prior to the invocation of wait_for_update in all cases, + so mutex acquisition inside wait_for_update() is not an option. + */ pthread_mutex_unlock(&LOCK_log); thd->exit_cond(old_msg); } + void MYSQL_LOG::close(bool exiting) { // One can't set log_type here! if (is_open()) diff --git a/sql/log_event.cc b/sql/log_event.cc index fd04f8dbbaa..9315baa0de5 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -159,11 +159,11 @@ static void cleanup_load_tmpdir() if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME)))) return; - for (i=0;i<(uint)dirp->number_off_files;i++) + for (i=0 ; i < (uint)dirp->number_off_files; i++) { file=dirp->dir_entry+i; - if (!memcmp(file->name,"SQL_LOAD-",9)) - my_delete(file->name,MYF(MY_WME)); + if (is_prefix(file->name,"SQL_LOAD-")) + my_delete(file->name,MYF(0)); } my_dirend(dirp); @@ -246,7 +246,7 @@ void Load_log_event::pack_info(String* packet) char buf[256]; String tmp(buf, sizeof(buf)); tmp.length(0); - if(db && db_len) + if (db && db_len) { tmp.append("use "); tmp.append(db, db_len); @@ -256,9 +256,9 @@ void Load_log_event::pack_info(String* packet) tmp.append("LOAD DATA INFILE '"); tmp.append(fname, fname_len); tmp.append("' ", 2); - if(sql_ex.opt_flags && REPLACE_FLAG ) + if (sql_ex.opt_flags && REPLACE_FLAG ) tmp.append(" REPLACE "); - else if(sql_ex.opt_flags && IGNORE_FLAG ) + else if (sql_ex.opt_flags && IGNORE_FLAG ) tmp.append(" IGNORE "); tmp.append("INTO TABLE "); @@ -305,7 +305,7 @@ void Load_log_event::pack_info(String* packet) tmp.append(" ("); for(i = 0; i < num_fields; i++) { - if(i) + if (i) tmp.append(" ,"); tmp.append( field); @@ -326,7 +326,7 @@ void Rotate_log_event::pack_info(String* packet) tmp.append(new_log_ident, ident_len); tmp.append(";pos="); tmp.append(llstr(pos,buf)); - if(flags & LOG_EVENT_FORCED_ROTATE_F) + if (flags & LOG_EVENT_FORCED_ROTATE_F) tmp.append("; forced by master"); net_store_data(packet, tmp.ptr(), tmp.length()); } @@ -436,7 +436,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, // if the read hits eof, we must report it as eof // so the caller will know it can go into cond_wait to be woken up // on the next update to the log - if(!file->error) return LOG_READ_EOF; + if (!file->error) return LOG_READ_EOF; return file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO; } data_len = uint4korr(buf + EVENT_LEN_OFFSET); @@ -452,7 +452,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, { if (packet->append(file, data_len)) { - if(log_lock) + if (log_lock) pthread_mutex_unlock(log_lock); // here we should never hit eof in a non-error condtion // eof means we are reading the event partially, which should @@ -467,13 +467,13 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, #endif // MYSQL_CLIENT #ifndef MYSQL_CLIENT -#define UNLOCK_MUTEX if(log_lock) pthread_mutex_unlock(log_lock); +#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock); #else #define UNLOCK_MUTEX #endif #ifndef MYSQL_CLIENT -#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock); +#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock); #else #define LOCK_MUTEX #endif @@ -672,7 +672,7 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) if (new_log_ident) my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, MYF(MY_NABP | MY_WME)); - fprintf(file, "pos=%s\n", llstr(pos, buf)); + fprintf(file, " pos: %s\n", llstr(pos, buf)); fflush(file); } @@ -701,11 +701,10 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len, bool old_format): Log_event(buf, old_format),new_log_ident(NULL),alloced(0) { - // the caller will ensure that event_len is what we have at - // EVENT_LEN_OFFSET + // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; uint ident_offset; - if(event_len < header_size) + if (event_len < header_size) return; buf += header_size; if (old_format) @@ -753,8 +752,8 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, #endif Query_log_event::Query_log_event(const char* buf, int event_len, - bool old_format): - Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL) + bool old_format) + :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL) { ulong data_len; if (old_format) @@ -801,9 +800,9 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) bool same_db = 0; - if(db && last_db) + if (db && last_db) { - if(!(same_db = !memcmp(last_db, db, db_len + 1))) + if (!(same_db = !memcmp(last_db, db, db_len + 1))) memcpy(last_db, db, db_len + 1); } @@ -864,7 +863,7 @@ int Intvar_log_event::write_data(IO_CACHE* file) void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) { char llbuff[22]; - if(!short_form) + if (!short_form) { print_header(file); fprintf(file, "\tIntvar\n"); @@ -961,11 +960,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) if (use_new_format) { empty_flags=0; - /* the code below assumes that buf will not disappear from - under our feet during the lifetime of the event. This assumption - holds true in the slave thread if the log is in new format, but is not - the case when we have old format because we will be reusing net buffer - to read the actual file before we write out the Create_file event + /* + The code below assumes that buf will not disappear from + under our feet during the lifetime of the event. This assumption + holds true in the slave thread if the log is in new format, but is not + the case when we have old format because we will be reusing net buffer + to read the actual file before we write out the Create_file event. */ if (read_str(buf, buf_end, field_term, field_term_len) || read_str(buf, buf_end, enclosed, enclosed_len) || @@ -1003,77 +1003,75 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) #ifndef MYSQL_CLIENT Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, const char* db_arg, const char* table_name_arg, - List<Item>& fields_arg, enum enum_duplicates handle_dup): - Log_event(thd),thread_id(thd->thread_id), - num_fields(0),fields(0),field_lens(0),field_block_len(0), - table_name(table_name_arg), - db(db_arg), - fname(ex->file_name) - { - time_t end_time; - time(&end_time); - exec_time = (ulong) (end_time - thd->start_time); - db_len = (db) ? (uint32) strlen(db) : 0; - table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; - fname_len = (fname) ? (uint) strlen(fname) : 0; - sql_ex.field_term = (char*) ex->field_term->ptr(); - sql_ex.field_term_len = (uint8) ex->field_term->length(); - sql_ex.enclosed = (char*) ex->enclosed->ptr(); - sql_ex.enclosed_len = (uint8) ex->enclosed->length(); - sql_ex.line_term = (char*) ex->line_term->ptr(); - sql_ex.line_term_len = (uint8) ex->line_term->length(); - sql_ex.line_start = (char*) ex->line_start->ptr(); - sql_ex.line_start_len = (uint8) ex->line_start->length(); - sql_ex.escaped = (char*) ex->escaped->ptr(); - sql_ex.escaped_len = (uint8) ex->escaped->length(); - sql_ex.opt_flags = 0; - sql_ex.cached_new_format = -1; + List<Item>& fields_arg, enum enum_duplicates handle_dup) + :Log_event(thd),thread_id(thd->thread_id), num_fields(0),fields(0), + field_lens(0),field_block_len(0), table_name(table_name_arg), + db(db_arg), fname(ex->file_name) +{ + time_t end_time; + time(&end_time); + exec_time = (ulong) (end_time - thd->start_time); + db_len = (db) ? (uint32) strlen(db) : 0; + table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; + fname_len = (fname) ? (uint) strlen(fname) : 0; + sql_ex.field_term = (char*) ex->field_term->ptr(); + sql_ex.field_term_len = (uint8) ex->field_term->length(); + sql_ex.enclosed = (char*) ex->enclosed->ptr(); + sql_ex.enclosed_len = (uint8) ex->enclosed->length(); + sql_ex.line_term = (char*) ex->line_term->ptr(); + sql_ex.line_term_len = (uint8) ex->line_term->length(); + sql_ex.line_start = (char*) ex->line_start->ptr(); + sql_ex.line_start_len = (uint8) ex->line_start->length(); + sql_ex.escaped = (char*) ex->escaped->ptr(); + sql_ex.escaped_len = (uint8) ex->escaped->length(); + sql_ex.opt_flags = 0; + sql_ex.cached_new_format = -1; - if(ex->dumpfile) - sql_ex.opt_flags |= DUMPFILE_FLAG; - if(ex->opt_enclosed) - sql_ex.opt_flags |= OPT_ENCLOSED_FLAG; + if (ex->dumpfile) + sql_ex.opt_flags |= DUMPFILE_FLAG; + if (ex->opt_enclosed) + sql_ex.opt_flags |= OPT_ENCLOSED_FLAG; - sql_ex.empty_flags = 0; + sql_ex.empty_flags = 0; - switch(handle_dup) - { - case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break; - case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break; - case DUP_ERROR: break; - } + switch(handle_dup) + { + case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break; + case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break; + case DUP_ERROR: break; + } - if(!ex->field_term->length()) - sql_ex.empty_flags |= FIELD_TERM_EMPTY; - if(!ex->enclosed->length()) - sql_ex.empty_flags |= ENCLOSED_EMPTY; - if(!ex->line_term->length()) - sql_ex.empty_flags |= LINE_TERM_EMPTY; - if(!ex->line_start->length()) - sql_ex.empty_flags |= LINE_START_EMPTY; - if(!ex->escaped->length()) - sql_ex.empty_flags |= ESCAPED_EMPTY; + if (!ex->field_term->length()) + sql_ex.empty_flags |= FIELD_TERM_EMPTY; + if (!ex->enclosed->length()) + sql_ex.empty_flags |= ENCLOSED_EMPTY; + if (!ex->line_term->length()) + sql_ex.empty_flags |= LINE_TERM_EMPTY; + if (!ex->line_start->length()) + sql_ex.empty_flags |= LINE_START_EMPTY; + if (!ex->escaped->length()) + sql_ex.empty_flags |= ESCAPED_EMPTY; - skip_lines = ex->skip_lines; - - List_iterator<Item> li(fields_arg); - field_lens_buf.length(0); - fields_buf.length(0); - Item* item; - while((item = li++)) - { - num_fields++; - uchar len = (uchar) strlen(item->name); - field_block_len += len + 1; - fields_buf.append(item->name, len + 1); - field_lens_buf.append((char*)&len, 1); - } + skip_lines = ex->skip_lines; - field_lens = (const uchar*)field_lens_buf.ptr(); - fields = fields_buf.ptr(); + List_iterator<Item> li(fields_arg); + field_lens_buf.length(0); + fields_buf.length(0); + Item* item; + while ((item = li++)) + { + num_fields++; + uchar len = (uchar) strlen(item->name); + field_block_len += len + 1; + fields_buf.append(item->name, len + 1); + field_lens_buf.append((char*)&len, 1); } + field_lens = (const uchar*)field_lens_buf.ptr(); + fields = fields_buf.ptr(); +} + #endif // the caller must do buf[event_len] = 0 before he starts using the @@ -1145,32 +1143,32 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) bool same_db = 0; - if(db && last_db) - { - if(!(same_db = !memcmp(last_db, db, db_len + 1))) - memcpy(last_db, db, db_len + 1); - } + if (db && last_db) + { + if (!(same_db = !memcmp(last_db, db, db_len + 1))) + memcpy(last_db, db, db_len + 1); + } - if(db && db[0] && !same_db) + if (db && db[0] && !same_db) fprintf(file, "use %s;\n", db); fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname); - if(sql_ex.opt_flags && REPLACE_FLAG ) + if (sql_ex.opt_flags && REPLACE_FLAG ) fprintf(file," REPLACE "); - else if(sql_ex.opt_flags && IGNORE_FLAG ) + else if (sql_ex.opt_flags && IGNORE_FLAG ) fprintf(file," IGNORE "); fprintf(file, "INTO TABLE %s ", table_name); - if(sql_ex.field_term) + if (sql_ex.field_term) { fprintf(file, " FIELDS TERMINATED BY "); pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len); } - if(sql_ex.enclosed) + if (sql_ex.enclosed) { - if(sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) + if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) fprintf(file," OPTIONALLY "); fprintf(file, " ENCLOSED BY "); pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len); @@ -1194,7 +1192,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); } - if((int)skip_lines > 0) + if ((int)skip_lines > 0) fprintf(file, " IGNORE %ld LINES ", (long) skip_lines); if (num_fields) @@ -1204,7 +1202,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) fprintf( file, " ("); for(i = 0; i < num_fields; i++) { - if(i) + if (i) fputc(',', file); fprintf(file, field); @@ -1282,7 +1280,7 @@ Slave_log_event::~Slave_log_event() void Slave_log_event::print(FILE* file, bool short_form, char* last_db) { char llbuff[22]; - if(short_form) + if (short_form) return; print_header(file); fputc('\n', file); @@ -1314,7 +1312,7 @@ void Slave_log_event::init_from_mem_pool(int data_size) master_host_len = strlen(master_host); // safety master_log = master_host + master_host_len + 1; - if(master_log > mem_pool + data_size) + if (master_log > mem_pool + data_size) { master_host = 0; return; @@ -1326,9 +1324,9 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len): Log_event(buf,0),mem_pool(0),master_host(0) { event_len -= LOG_EVENT_HEADER_LEN; - if(event_len < 0) + if (event_len < 0) return; - if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) + if (!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) return; memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len); mem_pool[event_len] = 0; @@ -1341,7 +1339,7 @@ Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex, List<Item>& fields_arg, enum enum_duplicates handle_dup, char* block_arg, uint block_len_arg): Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup), - fake_base(0),block(block_arg),block_len(block_len_arg), + fake_base(0),block(block_arg),block_len(block_len_arg), file_id(thd_arg->file_id = mysql_bin_log.next_file_id()) { sql_ex.force_new_format(); @@ -1409,7 +1407,7 @@ void Create_file_log_event::print(FILE* file, bool short_form, if (short_form) return; Load_log_event::print(file, 1, last_db); - fprintf(file, " file_id=%d, block_len=%d\n", file_id, block_len); + fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len); } #endif @@ -1444,7 +1442,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, Append_block_log_event::Append_block_log_event(const char* buf, int len): Log_event(buf, 0),block(0) { - if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) + if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) return; file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD; @@ -1467,7 +1465,7 @@ void Append_block_log_event::print(FILE* file, bool short_form, return; print_header(file); fputc('\n', file); - fprintf(file, "#Append_block: file_id=%d, block_len=%d\n", + fprintf(file, "#Append_block: file_id: %d block_len: %d\n", file_id, block_len); } #endif @@ -1496,7 +1494,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg): Delete_file_log_event::Delete_file_log_event(const char* buf, int len): Log_event(buf, 0),file_id(0) { - if((uint)len < DELETE_FILE_EVENT_OVERHEAD) + if ((uint)len < DELETE_FILE_EVENT_OVERHEAD) return; file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); } @@ -1543,7 +1541,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg): Execute_load_log_event::Execute_load_log_event(const char* buf,int len): Log_event(buf, 0),file_id(0) { - if((uint)len < EXEC_LOAD_EVENT_OVERHEAD) + if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD) return; file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET); } @@ -1662,7 +1660,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) thd->query = 0; thd->query_error = 0; - if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) { thd->set_time((time_t)when); thd->current_tablenr = 0; @@ -1676,7 +1674,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) tables.name = tables.real_name = (char*)table_name; tables.lock_type = TL_WRITE; // the table will be opened in mysql_load - if(table_rules_on && !tables_ok(thd, &tables)) + if (table_rules_on && !tables_ok(thd, &tables)) { // TODO: this is a bug - this needs to be moved to the I/O thread if (net) @@ -1712,14 +1710,14 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) // about the packet sequence thd->net.pkt_nr = net->pkt_nr; } - if(mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, + if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, TL_WRITE)) thd->query_error = 1; - if(thd->cuted_fields) + if (thd->cuted_fields) sql_print_error("Slave: load data infile at position %s in log \ '%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME, thd->cuted_fields ); - if(net) + if (net) net->pkt_nr = thd->net.pkt_nr; } } @@ -1735,7 +1733,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) thd->net.vio = 0; thd->db = 0;// prevent db from being freed close_thread_tables(thd); - if(thd->query_error) + if (thd->query_error) { int sql_error = thd->net.last_errno; if (!sql_error) @@ -1749,7 +1747,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) } free_root(&thd->mem_root,0); - if(thd->fatal_error) + if (thd->fatal_error) { sql_print_error("Slave: Fatal error running LOAD DATA INFILE "); return 1; @@ -1849,7 +1847,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli) int Slave_log_event::exec_event(struct st_relay_log_info* rli) { - if(mysql_bin_log.is_open()) + if (mysql_bin_log.is_open()) mysql_bin_log.write(this); return Log_event::exec_event(rli); } @@ -1978,11 +1976,12 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) slave_print_error(rli,0, "File '%s' appears corrupted", fname); goto err; } - // we want to disable binary logging in slave thread - // because we need the file events to appear in the same order - // as they do on the master relative to other events, so that we - // can preserve ascending order of log sequence numbers - needed - // to handle failover + /* + We want to disable binary logging in slave thread because we need the file + events to appear in the same order as they do on the master relative to + other events, so that we can preserve ascending order of log sequence + numbers - needed to handle failover . + */ save_options = thd->options; thd->options &= ~ (ulong) (OPTION_BIN_LOG); lev->thd = thd; diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index fab1491fc2b..c7ca906ca13 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -670,8 +670,10 @@ int load_master_data(THD* thd) int restart_thread_mask; mc_mysql_init(&mysql); - // we do not want anyone messing with the slave at all for the entire - // duration of the data load; + /* + We do not want anyone messing with the slave at all for the entire + duration of the data load. + */ LOCK_ACTIVE_MI; lock_slave_threads(active_mi); init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/); @@ -707,8 +709,10 @@ int load_master_data(THD* thd) if (!(num_dbs = (uint) mc_mysql_num_rows(db_res))) goto err; - // in theory, the master could have no databases at all - // and run with skip-grant + /* + In theory, the master could have no databases at all + and run with skip-grant + */ if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*)))) { @@ -716,10 +720,12 @@ int load_master_data(THD* thd) goto err; } - // this is a temporary solution until we have online backup - // capabilities - to be replaced once online backup is working - // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we - // can to minimize the lock time + /* + This is a temporary solution until we have online backup + capabilities - to be replaced once online backup is working + we wait to issue FLUSH TABLES WITH READ LOCK for as long as we + can to minimize the lock time. + */ if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) || mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) || !(master_status_res = mc_mysql_store_result(&mysql))) @@ -729,8 +735,10 @@ int load_master_data(THD* thd) goto err; } - // go through every table in every database, and if the replication - // rules allow replicating it, get it + /* + Go through every table in every database, and if the replication + rules allow replicating it, get it + */ table_res_end = table_res + num_dbs; @@ -819,7 +827,7 @@ int load_master_data(THD* thd) } } thd->proc_info="purging old relay logs"; - if (purge_relay_logs(&active_mi->rli,0 /* not only reset, but also reinit*/, + if (purge_relay_logs(&active_mi->rli,0 /* not only reset, but also reinit */, &errmsg)) { send_error(&thd->net, 0, "Failed purging old relay logs"); diff --git a/sql/slave.cc b/sql/slave.cc index 93e711f2e14..66837436a09 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -42,12 +42,16 @@ bool do_table_inited = 0, ignore_table_inited = 0; bool wild_do_table_inited = 0, wild_ignore_table_inited = 0; bool table_rules_on = 0; static TABLE* save_temporary_tables = 0; -ulong relay_log_space_limit = 0; /* TODO: fix variables to access ulonglong - values and make it ulonglong */ -// when slave thread exits, we need to remember the temporary tables so we -// can re-use them on slave start +/* TODO: fix variables to access ulonglong values and make it ulonglong */ +ulong relay_log_space_limit = 0; + +/* + When slave thread exits, we need to remember the temporary tables so we + can re-use them on slave start. + + TODO: move the vars below under MASTER_INFO +*/ -// TODO: move the vars below under MASTER_INFO int disconnect_slave_event_count = 0, abort_slave_event_count = 0; static int events_till_disconnect = -1; int events_till_abort = -1; @@ -83,9 +87,10 @@ void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse) bool set_io = mi->slave_running, set_sql = mi->rli.slave_running; if (inverse) { - /* This makes me think of the Russian idiom "I am not I, and this is - not my horse", which is used to deny reponsibility for - one's actions. + /* + This makes me think of the Russian idiom "I am not I, and this is + not my horse", which is used to deny reponsibility for + one's actions. */ set_io = !set_io; set_sql = !set_sql; @@ -164,13 +169,16 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, } // TODO: check proper initialization of master_log_name/master_log_pos + int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ulonglong pos, bool need_data_lock, const char** errmsg) { + DBUG_ENTER("init_relay_log_pos"); + *errmsg=0; if (rli->log_pos_current) - return 0; + DBUG_RETURN(0); pthread_mutex_t *log_lock=rli->relay_log.get_log_lock(); pthread_mutex_lock(log_lock); if (need_data_lock) @@ -190,8 +198,10 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, else rli->relay_log_pos = pos; - // test to see if the previous run was with the skip of purging - // if yes, we do not purge when we restart + /* + Test to see if the previous run was with the skip of purging + If yes, we do not purge when we restart + */ if (rli->relay_log.find_first_log(&rli->linfo,"")) { *errmsg="Could not find first log during relay log initialization"; @@ -213,34 +223,31 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, if (rli->relay_log.is_active(rli->linfo.log_file_name)) { if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 && - check_binlog_magic(rli->cur_log,errmsg)) - { + check_binlog_magic(rli->cur_log,errmsg)) goto err; - } - rli->cur_log_init_count=rli->cur_log->init_count; + rli->cur_log_old_open_count=rli->relay_log.get_open_count(); } else { if (rli->inited) end_io_cache(&rli->cache_buf); - if (rli->cur_log_fd>=0) + if (rli->cur_log_fd >= 0) my_close(rli->cur_log_fd,MYF(MY_WME)); if ((rli->cur_log_fd=open_binlog(&rli->cache_buf, rli->linfo.log_file_name,errmsg)) < 0) - { goto err; - } rli->cur_log = &rli->cache_buf; } - if (pos > 4) - my_b_seek(rli->cur_log,(off_t)pos); - rli->log_pos_current=1; + if (pos > BIN_LOG_HEADER_SIZE) + my_b_seek(rli->cur_log,(off_t)pos); + rli->log_pos_current=1; + err: - pthread_cond_broadcast(&rli->data_cond); - if (need_data_lock) - pthread_mutex_unlock(&rli->data_lock); - pthread_mutex_unlock(log_lock); - return (*errmsg) ? 1 : 0; + pthread_cond_broadcast(&rli->data_cond); + if (need_data_lock) + pthread_mutex_unlock(&rli->data_lock); + pthread_mutex_unlock(log_lock); + DBUG_RETURN ((*errmsg) ? 1 : 0); } /* called from get_options() in mysqld.cc on start-up */ @@ -274,8 +281,9 @@ void init_slave_skip_errors(const char* arg) } } + /* - We assume we have a run lock on rli and that the both slave thread + We assume we have a run lock on rli and that both slave thread are not running */ @@ -284,9 +292,11 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) DBUG_ENTER("purge_relay_logs"); if (!rli->inited) DBUG_RETURN(0); /* successfully do nothing */ + int error=0; + DBUG_ASSERT(rli->slave_running == 0); DBUG_ASSERT(rli->mi->slave_running == 0); - int error=0; + rli->slave_skip_counter=0; pthread_mutex_lock(&rli->data_lock); rli->pending=0; @@ -301,17 +311,19 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) } strnmov(rli->relay_log_name,rli->linfo.log_file_name, sizeof(rli->relay_log_name)-1); - rli->log_space_total=4; //just first log with magic number and nothing else - rli->relay_log_pos=4; + // Just first log with magic number and nothing else + rli->log_space_total= BIN_LOG_HEADER_SIZE; + rli->relay_log_pos= BIN_LOG_HEADER_SIZE; rli->relay_log.reset_bytes_written(); rli->log_pos_current=0; if (!just_reset) - error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg); + error = init_relay_log_pos(rli,0,0,0 /* do not need data lock */,errmsg); + err: #ifndef DBUG_OFF char buf[22]; #endif - DBUG_PRINT("info",("log_space_total=%s",llstr(rli->log_space_total,buf))); + DBUG_PRINT("info",("log_space_total: %s",llstr(rli->log_space_total,buf))); pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(error); } @@ -451,10 +463,12 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock, "Waiting for slave thread to start"); pthread_cond_wait(start_cond,cond_lock); thd->exit_cond(old_msg); - // TODO: in a very rare case of init_slave_thread failing, it is - // possible that we can get stuck here since slave_running will not - // be set. We need to change slave_running to int and have -1 as - // error code + /* + TODO: in a very rare case of init_slave_thread failing, it is + possible that we can get stuck here since slave_running will not + be set. We need to change slave_running to int and have -1 as + error code. + */ if (thd->killed) { pthread_mutex_unlock(cond_lock); @@ -466,10 +480,14 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock, pthread_mutex_unlock(start_lock); return 0; } -/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make - sense to do that for starting a slave - we always care if it actually - started the threads that were not previously running + + +/* + SLAVE_FORCE_ALL is not implemented here on purpose since it does not make + sense to do that for starting a slave - we always care if it actually + started the threads that were not previously running */ + int start_slave_threads(bool need_slave_mutex, bool wait_for_start, MASTER_INFO* mi, const char* master_info_fname, const char* slave_info_fname, int thread_mask) @@ -567,9 +585,10 @@ int tables_ok(THD* thd, TABLE_LIST* tables) return 0; } - // if no explicit rule found - // and there was a do list, do not replicate. If there was - // no do list, go ahead + /* + If no explicit rule found and there was a do list, do not replicate. + If there was no do list, go ahead + */ return !do_table_inited && !wild_do_table_inited; } @@ -577,12 +596,12 @@ int tables_ok(THD* thd, TABLE_LIST* tables) int add_table_rule(HASH* h, const char* table_spec) { const char* dot = strchr(table_spec, '.'); - if(!dot) return 1; + if (!dot) return 1; // len is always > 0 because we know the there exists a '.' uint len = (uint)strlen(table_spec); TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT) + len, MYF(MY_WME)); - if(!e) return 1; + if (!e) return 1; e->db = (char*)e + sizeof(TABLE_RULE_ENT); e->tbl_name = e->db + (dot - table_spec) + 1; e->key_len = len; @@ -594,11 +613,11 @@ int add_table_rule(HASH* h, const char* table_spec) int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec) { const char* dot = strchr(table_spec, '.'); - if(!dot) return 1; + if (!dot) return 1; uint len = (uint)strlen(table_spec); TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT) + len, MYF(MY_WME)); - if(!e) return 1; + if (!e) return 1; e->db = (char*)e + sizeof(TABLE_RULE_ENT); e->tbl_name = e->db + (dot - table_spec) + 1; e->key_len = len; @@ -627,9 +646,11 @@ static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/) void end_slave() { - // TODO: replace the line below with - // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); - // once multi-master code is ready + /* + TODO: replace the line below with + list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); + once multi-master code is ready. + */ terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); end_master_info(active_mi); if (do_table_inited) @@ -671,65 +692,68 @@ void skip_load_data_infile(NET* net) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); - (void)my_net_read(net); // discard response - send_ok(net); // the master expects it + (void)my_net_read(net); // discard response + send_ok(net); // the master expects it } char* rewrite_db(char* db) { - if(replicate_rewrite_db.is_empty() || !db) return db; + if (replicate_rewrite_db.is_empty() || !db) + return db; I_List_iterator<i_string_pair> it(replicate_rewrite_db); i_string_pair* tmp; - while((tmp=it++)) - { - if(!strcmp(tmp->key, db)) - return tmp->val; - } - + while ((tmp=it++)) + { + if (!strcmp(tmp->key, db)) + return tmp->val; + } return db; } + int db_ok(const char* db, I_List<i_string> &do_list, I_List<i_string> &ignore_list ) { if (do_list.is_empty() && ignore_list.is_empty()) return 1; // ok to replicate if the user puts no constraints - // if the user has specified restrictions on which databases to replicate - // and db was not selected, do not replicate - if(!db) + /* + If the user has specified restrictions on which databases to replicate + and db was not selected, do not replicate. + */ + if (!db) return 0; - if(!do_list.is_empty()) // if the do's are not empty - { - I_List_iterator<i_string> it(do_list); - i_string* tmp; + if (!do_list.is_empty()) // if the do's are not empty + { + I_List_iterator<i_string> it(do_list); + i_string* tmp; - while((tmp=it++)) - { - if(!strcmp(tmp->ptr, db)) - return 1; // match - } - return 0; + while ((tmp=it++)) + { + if (!strcmp(tmp->ptr, db)) + return 1; // match } + return 0; + } else // there are some elements in the don't, otherwise we cannot get here - { - I_List_iterator<i_string> it(ignore_list); - i_string* tmp; + { + I_List_iterator<i_string> it(ignore_list); + i_string* tmp; - while((tmp=it++)) - { - if(!strcmp(tmp->ptr, db)) - return 0; // match - } - - return 1; + while ((tmp=it++)) + { + if (!strcmp(tmp->ptr, db)) + return 0; // match } + + return 1; + } } -static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f, - char* default_val) +static int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, + const char *default_val) { uint length; if ((length=my_b_gets(f,var, max_size))) @@ -739,10 +763,12 @@ static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f, *last_p = 0; // if we stopped on newline, kill it else { - // if we truncated a line or stopped on last char, remove all chars - // up to and including newline + /* + If we truncated a line or stopped on last char, remove all chars + up to and including newline. + */ int c; - while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF)); + while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)); } return 0; } @@ -763,7 +789,7 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) *var = atoi(buf); return 0; } - else if(default_val) + else if (default_val) { *var = default_val; return 0; @@ -796,12 +822,12 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi) goto err; } - switch (*version) - { + switch (*version) { case '3': mi->old_format = 1; break; case '4': + case '5': mi->old_format = 0; break; default: @@ -895,9 +921,11 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, check_opt.init(); check_opt.flags|= T_VERY_SILENT | T_CALC_CHECKSUM | T_QUICK; thd->proc_info = "Rebuilding the index on master dump table"; - // we do not want repair() to spam us with messages - // just send them to the error log, and report the failure in case of - // problems + /* + We do not want repair() to spam us with messages + just send them to the error log, and report the failure in case of + problems. + */ save_vio = thd->net.vio; thd->net.vio = 0; error=file->repair(thd,&check_opt) != 0; @@ -978,15 +1006,15 @@ void end_master_info(MASTER_INFO* mi) int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) { DBUG_ENTER("init_relay_log_info"); - if (rli->inited) - DBUG_RETURN(0); MY_STAT stat_area; char fname[FN_REFLEN+128]; int info_fd; const char* msg = 0; int error = 0; - fn_format(fname, info_fname, - mysql_data_home, "", 4+32); + + if (rli->inited) + DBUG_RETURN(0); + fn_format(fname, info_fname, mysql_data_home, "", 4+32); pthread_mutex_lock(&rli->data_lock); info_fd = rli->info_fd; rli->pending = 0; @@ -1001,8 +1029,9 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) if (!opt_relay_logname) { char tmp[FN_REFLEN]; - /* TODO: The following should be using fn_format(); We just need to - first change fn_format() to cut the file name if it's too long. + /* + TODO: The following should be using fn_format(); We just need to + first change fn_format() to cut the file name if it's too long. */ strmake(tmp,glob_hostname,FN_REFLEN-5); strmov(strcend(tmp,'.'),"-relay-bin"); @@ -1011,72 +1040,76 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) rli->relay_log.set_index_file_name(opt_relaylog_index_name); open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin", LOG_BIN, 1 /* read_append cache */, - 1 /* no auto events*/); + 1 /* no auto events */); /* if file does not exist */ if (!my_stat(fname, &stat_area, MYF(0))) { - // if someone removed the file from underneath our feet, just close - // the old descriptor and re-create the old file + /* + If someone removed the file from underneath our feet, just close + the old descriptor and re-create the old file + */ if (info_fd >= 0) my_close(info_fd, MYF(MY_WME)); - if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 - || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, - MYF(MY_WME))) + if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, + MYF(MY_WME))) { - if(info_fd >= 0) + if (info_fd >= 0) my_close(info_fd, MYF(0)); - rli->info_fd=-1; + rli->info_fd= -1; pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(1); } - if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg)) + if (init_relay_log_pos(rli,"",BIN_LOG_HEADER_SIZE,0 /*no data mutex*/, + &msg)) goto err; rli->master_log_pos = 0; // uninitialized rli->info_fd = info_fd; } else // file exists { - if(info_fd >= 0) + if (info_fd >= 0) reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0); - else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 - || init_io_cache(&rli->info_file, info_fd, - IO_SIZE*2, READ_CACHE, 0L, - 0, MYF(MY_WME))) + else if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&rli->info_file, info_fd, + IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME))) { if (info_fd >= 0) my_close(info_fd, MYF(0)); - rli->info_fd=-1; + rli->info_fd= -1; pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(1); } rli->info_fd = info_fd; if (init_strvar_from_file(rli->relay_log_name, - sizeof(rli->relay_log_name), &rli->info_file, - (char*)"") || + sizeof(rli->relay_log_name), &rli->info_file, + "") || init_intvar_from_file((int*)&rli->relay_log_pos, - &rli->info_file, 4) || + &rli->info_file, BIN_LOG_HEADER_SIZE) || init_strvar_from_file(rli->master_log_name, sizeof(rli->master_log_name), &rli->info_file, - (char*)"") || + "") || init_intvar_from_file((int*)&rli->master_log_pos, &rli->info_file, 0)) { msg="Error reading slave log configuration"; goto err; } - if (init_relay_log_pos(rli,0 /*log already inited*/, - 0 /*pos already inited*/, + if (init_relay_log_pos(rli,0 /* log already inited */, + 0 /* pos already inited */, 0 /* no data lock*/, &msg)) goto err; } - DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); rli->inited = 1; - // now change the cache from READ to WRITE - must do this - // before flush_relay_log_info + /* + Now change the cache from READ to WRITE - must do this + before flush_relay_log_info + */ reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); error=test(flush_relay_log_info(rli)); if (count_relay_log_space(rli)) @@ -1091,7 +1124,7 @@ err: sql_print_error(msg); end_io_cache(&rli->info_file); my_close(info_fd, MYF(0)); - rli->info_fd=-1; + rli->info_fd= -1; pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(1); } @@ -1135,6 +1168,7 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli) DBUG_RETURN(slave_killed); } + static int count_relay_log_space(RELAY_LOG_INFO* rli) { LOG_INFO linfo; @@ -1145,31 +1179,32 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) sql_print_error("Could not find first log while counting relay log space"); DBUG_RETURN(1); } - if (add_relay_log(rli,&linfo)) - DBUG_RETURN(1); - while (!rli->relay_log.find_next_log(&linfo)) + do { if (add_relay_log(rli,&linfo)) DBUG_RETURN(1); - } + } while (!rli->relay_log.find_next_log(&linfo)); DBUG_RETURN(0); } + int init_master_info(MASTER_INFO* mi, const char* master_info_fname, const char* slave_info_fname) { + int fd,error; + MY_STAT stat_area; + char fname[FN_REFLEN+128]; + const char *msg; + DBUG_ENTER("init_master_info"); + if (mi->inited) - return 0; + DBUG_RETURN(0); if (init_relay_log_info(&mi->rli, slave_info_fname)) - return 1; + DBUG_RETURN(1); mi->rli.mi = mi; mi->mysql=0; mi->file_id=1; mi->ignore_stop_event=0; - int fd,error; - MY_STAT stat_area; - char fname[FN_REFLEN+128]; - const char *msg; fn_format(fname, master_info_fname, mysql_data_home, "", 4+32); /* @@ -1183,23 +1218,19 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, // we do not want any messages if the file does not exist if (!my_stat(fname, &stat_area, MYF(0))) { - // if someone removed the file from underneath our feet, just close - // the old descriptor and re-create the old file + /* + if someone removed the file from underneath our feet, just close + the old descriptor and re-create the old file + */ if (fd >= 0) my_close(fd, MYF(MY_WME)); - if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 - || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0, - MYF(MY_WME))) - { - if(fd >= 0) - my_close(fd, MYF(0)); - mi->fd=-1; - end_relay_log_info(&mi->rli); - pthread_mutex_unlock(&mi->data_lock); - return 1; - } + if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0, + MYF(MY_WME))) + goto err; + mi->master_log_name[0] = 0; - mi->master_log_pos = 4; // skip magic number + mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number mi->fd = fd; if (master_host) @@ -1213,24 +1244,17 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, } else // file exists { - if(fd >= 0) + if (fd >= 0) reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0); - else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 - || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L, - 0, MYF(MY_WME))) - { - if(fd >= 0) - my_close(fd, MYF(0)); - mi->fd=-1; - end_relay_log_info(&mi->rli); - pthread_mutex_unlock(&mi->data_lock); - return 1; - } + else if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L, + 0, MYF(MY_WME))) + goto err; mi->fd = fd; if (init_strvar_from_file(mi->master_log_name, sizeof(mi->master_log_name), &mi->file, - (char*)"") || + "") || init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4) || init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, master_host) || @@ -1242,7 +1266,7 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, init_intvar_from_file((int*)&mi->connect_retry, &mi->file, master_connect_retry)) { - msg="Error reading master configuration"; + sql_print_error("Error reading master configuration"); goto err; } } @@ -1252,17 +1276,18 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1); error=test(flush_master_info(mi)); pthread_mutex_unlock(&mi->data_lock); - return error; + DBUG_RETURN(error); err: - sql_print_error(msg); - end_io_cache(&mi->file); end_relay_log_info(&mi->rli); - DBUG_ASSERT(fd>=0); - my_close(fd, MYF(0)); - mi->fd=-1; + if (fd >= 0) + { + my_close(fd, MYF(0)); + end_io_cache(&mi->file); + } + mi->fd= -1; pthread_mutex_unlock(&mi->data_lock); - return 1; + DBUG_RETURN(1); } int register_slave_on_master(MYSQL* mysql) @@ -1282,7 +1307,7 @@ int register_slave_on_master(MYSQL* mysql) else packet.append((char)0); - if(report_password) + if (report_password) net_store_data(&packet, report_user); else packet.append((char)0); @@ -1333,7 +1358,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi) field_list.push_back(new Item_empty_string("Skip_counter", 12)); field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12)); field_list.push_back(new Item_empty_string("Relay_log_space", 12)); - if(send_fields(thd, field_list, 1)) + if (send_fields(thd, field_list, 1)) DBUG_RETURN(-1); String* packet = &thd->packet; @@ -1502,8 +1527,10 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed, */ thr_alarm(&alarmed, 2 * nap_time,&alarm_buff); sleep(nap_time); - // if we wake up before the alarm goes off, hit the button - // so it will not wake up the wife and kids :-) + /* + If we wake up before the alarm goes off, hit the button + so it will not wake up the wife and kids :-) + */ if (thr_alarm_in_use(&alarmed)) thr_end_alarm(&alarmed); @@ -1528,9 +1555,11 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi) memcpy(buf + 10, logname,len); if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) { - // something went wrong, so we will just reconnect and retry later - // in the future, we should do a better error analysis, but for - // now we just fill up the error log :-) + /* + Something went wrong, so we will just reconnect and retry later + in the future, we should do a better error analysis, but for + now we just fill up the error log :-) + */ sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs", mc_mysql_error(mysql), master_connect_retry); return 1; @@ -1545,7 +1574,7 @@ static int request_table_dump(MYSQL* mysql, const char* db, const char* table) char * p = buf; uint table_len = (uint) strlen(table); uint db_len = (uint) strlen(db); - if(table_len + db_len > sizeof(buf) - 2) + if (table_len + db_len > sizeof(buf) - 2) { sql_print_error("request_table_dump: Buffer overrun"); return 1; @@ -1571,8 +1600,10 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) { ulong len = packet_error; - // my_real_read() will time us out - // we check if we were told to die, and if not, try reading again + /* + my_real_read() will time us out + We check if we were told to die, and if not, try reading again + */ #ifndef DBUG_OFF if (disconnect_slave_event_count && !(events_till_disconnect--)) return packet_error; @@ -1643,12 +1674,14 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) type_code != STOP_EVENT ? ev->log_pos : LL(0), 1/* skip lock*/); flush_relay_log_info(rli); - if (rli->slave_skip_counter && /* protect against common user error of - setting the counter to 1 instead of 2 - while recovering from an failed - auto-increment insert */ - !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) && - rli->slave_skip_counter == 1)) + + /* + Protect against common user error of setting the counter to 1 + instead of 2 while recovering from an failed auto-increment insert + */ + if (rli->slave_skip_counter && + !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) && + rli->slave_skip_counter == 1)) --rli->slave_skip_counter; pthread_mutex_unlock(&rli->data_lock); delete ev; @@ -1718,7 +1751,7 @@ slave_begin: pthread_cond_broadcast(&mi->start_cond); pthread_mutex_unlock(&mi->run_lock); - DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", + DBUG_PRINT("info",("master info: log_file_name='%s', position=%s", mi->master_log_name, llstr(mi->master_log_pos,llbuff))); if (!(mi->mysql = mysql = mc_mysql_init(NULL))) @@ -1768,7 +1801,7 @@ connected: if (request_dump(mysql, mi)) { sql_print_error("Failed on request_dump()"); - if(io_slave_killed(thd,mi)) + if (io_slave_killed(thd,mi)) { sql_print_error("Slave I/O thread killed while requesting master \ dump"); @@ -1855,7 +1888,7 @@ reconnect done to recover from failed read"); goto err; } goto connected; - } // if(event_len == packet_error) + } // if (event_len == packet_error) thd->proc_info = "Queueing event from master"; if (queue_event(mi,(const char*)mysql->net.read_pos + 1, @@ -1909,11 +1942,11 @@ err: THD_CHECK_SENTRY(thd); delete thd; pthread_mutex_unlock(&LOCK_thread_count); - my_thread_end(); // clean-up before broadcast - pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done + my_thread_end(); // clean-up before broadcast + pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done pthread_mutex_unlock(&mi->run_lock); #ifndef DBUG_OFF - if(abort_slave_event_count && !events_till_abort) + if (abort_slave_event_count && !events_till_abort) goto slave_begin; #endif pthread_exit(0); @@ -1970,22 +2003,22 @@ slave_begin: rli->abort_slave = 0; pthread_cond_broadcast(&rli->start_cond); pthread_mutex_unlock(&rli->run_lock); - rli->pending = 0; //this should always be set to 0 when the slave thread - // is started - if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg)) + // This should always be set to 0 when the slave thread is started + rli->pending = 0; + if (init_relay_log_pos(rli,0,0,1 /*need data lock*/, &errmsg)) { sql_print_error("Error initializing relay log position: %s", errmsg); goto err; } - DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); - DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", + DBUG_PRINT("info",("master info: log_file_name: %s, position: %s", rli->master_log_name, llstr(rli->master_log_pos,llbuff))); DBUG_ASSERT(rli->sql_thd == thd); sql_print_error("Slave SQL thread initialized, starting replication in \ -log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME, +log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff),rli->relay_log_name, llstr(rli->relay_log_pos,llbuff1)); while (!sql_slave_killed(thd,rli)) @@ -2004,7 +2037,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); goto err; } - } // while(!sql_slave_killed(thd,rli)) - read/exec loop + } // while (!sql_slave_killed(thd,rli)) - read/exec loop // error = 0; err: @@ -2053,16 +2086,17 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) bool cev_not_written; THD* thd; NET* net = &mi->mysql->net; + DBUG_ENTER("process_io_create_file"); if (unlikely(!cev->is_valid())) - return 1; + DBUG_RETURN(1); /* TODO: fix to honor table rules, not only db rules */ if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db)) { skip_load_data_infile(net); - return 0; + DBUG_RETURN(0); } DBUG_ASSERT(cev->inited_from_old); thd = mi->io_thd; @@ -2137,10 +2171,13 @@ relay log"); } error=0; err: - return error; + DBUG_RETURN(error); } -// We assume we already locked mi->data_lock +/* + We assume we already locked mi->data_lock +*/ + static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev) { if (unlikely(!rev->is_valid())) @@ -2175,6 +2212,8 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, bool inc_pos = 1; bool processed_stop_event = 0; char* tmp_buf = 0; + DBUG_ENTER("queue_old_event"); + /* if we get Load event, we need to pass a non-reusable buffer to read_log_event, so we do a trick */ @@ -2183,7 +2222,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME))))) { sql_print_error("Slave I/O: out of memory for Load event"); - return 1; + DBUG_RETURN(1); } memcpy(tmp_buf,buf,event_len); tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer @@ -2196,8 +2235,8 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, sql_print_error("Read invalid event from master: '%s',\ master could be corrupt but a more likely cause of this is a bug", errmsg); - my_free((char*)tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); - return 1; + my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_RETURN(1); } pthread_mutex_lock(&mi->data_lock); ev->log_pos = mi->master_log_pos; @@ -2208,7 +2247,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, delete ev; pthread_mutex_unlock(&mi->data_lock); DBUG_ASSERT(!tmp_buf); - return 1; + DBUG_RETURN(1); } mi->ignore_stop_event=1; inc_pos = 0; @@ -2224,7 +2263,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, pthread_mutex_unlock(&mi->data_lock); DBUG_ASSERT(tmp_buf); my_free((char*)tmp_buf, MYF(0)); - return error; + DBUG_RETURN(error); } default: mi->ignore_stop_event=0; @@ -2237,7 +2276,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, delete ev; pthread_mutex_unlock(&mi->data_lock); DBUG_ASSERT(!tmp_buf); - return 1; + DBUG_RETURN(1); } mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total); } @@ -2248,7 +2287,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, mi->ignore_stop_event=1; pthread_mutex_unlock(&mi->data_lock); DBUG_ASSERT(!tmp_buf); - return 0; + DBUG_RETURN(0); } /* @@ -2261,8 +2300,10 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) int error=0; bool inc_pos = 1; bool processed_stop_event = 0; + DBUG_ENTER("queue_event"); + if (mi->old_format) - return queue_old_event(mi,buf,event_len); + DBUG_RETURN(queue_old_event(mi,buf,event_len)); pthread_mutex_lock(&mi->data_lock); @@ -2278,7 +2319,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) { Rotate_log_event rev(buf,event_len,0); if (unlikely(process_io_rotate(mi,&rev))) - return 1; + DBUG_RETURN(1); inc_pos=0; mi->ignore_stop_event=1; break; @@ -2298,7 +2339,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) if (unlikely(processed_stop_event)) mi->ignore_stop_event=1; pthread_mutex_unlock(&mi->data_lock); - return error; + DBUG_RETURN(error); } @@ -2425,18 +2466,27 @@ int flush_relay_log_info(RELAY_LOG_INFO* rli) return 0; } -IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg) + +/* + This function is called when we notice that the current "hot" log + got rotated under our feet. +*/ + +static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) { DBUG_ASSERT(rli->cur_log != &rli->cache_buf); - IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf; DBUG_ASSERT(rli->cur_log_fd == -1); + DBUG_ENTER("reopen_relay_log"); + + IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf; if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name, errmsg)) <0) - return 0; + DBUG_RETURN(0); my_b_seek(cur_log,rli->relay_log_pos); - return cur_log; + DBUG_RETURN(cur_log); } + Log_event* next_event(RELAY_LOG_INFO* rli) { Log_event* ev; @@ -2445,6 +2495,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) const char* errmsg=0; THD* thd = rli->sql_thd; bool was_killed; + DBUG_ENTER("next_event"); DBUG_ASSERT(thd != 0); /* @@ -2456,16 +2507,18 @@ Log_event* next_event(RELAY_LOG_INFO* rli) */ pthread_mutex_lock(&rli->data_lock); - for (; !(was_killed=sql_slave_killed(thd,rli)) ;) + while (!(was_killed=sql_slave_killed(thd,rli))) { /* We can have two kinds of log reading: - hot_log - rli->cur_log points at the IO_CACHE of relay_log, which - is actively being updated by the I/O thread. We need to be careful - in this case and make sure that we are not looking at a stale log that - has already been rotated. If it has been, we reopen the log - the other case is much simpler - we just have a read only log that - nobody else will be updating. + hot_log: + rli->cur_log points at the IO_CACHE of relay_log, which + is actively being updated by the I/O thread. We need to be careful + in this case and make sure that we are not looking at a stale log that + has already been rotated. If it has been, we reopen the log. + + The other case is much simpler: + We just have a read only log that nobody else will be updating. */ bool hot_log; if ((hot_log = (cur_log != &rli->cache_buf))) @@ -2474,43 +2527,43 @@ Log_event* next_event(RELAY_LOG_INFO* rli) pthread_mutex_lock(log_lock); /* - Reading cur_log->init_count here is safe because the log will only + Reading xxx_file_id is safe because the log will only be rotated when we hold relay_log.LOCK_log */ - if (cur_log->init_count != rli->cur_log_init_count) + if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count) { - if (!(cur_log=reopen_relay_log(rli,&errmsg))) - { - pthread_mutex_unlock(log_lock); + // The master has switched to a new log file; Reopen the old log file + cur_log=reopen_relay_log(rli, &errmsg); + pthread_mutex_unlock(log_lock); + if (!cur_log) // No more log files goto err; - } - pthread_mutex_unlock(log_lock); - hot_log=0; + hot_log=0; // Using old binary log } } - DBUG_ASSERT(my_b_tell(cur_log) >= 4); + DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); - /* relay log is always in new format - if the master is 3.23, the - I/O thread will convert the format for us + /* + Relay log is always in new format - if the master is 3.23, the + I/O thread will convert the format for us */ - if ((ev=Log_event::read_log_event(cur_log,0,(bool)0/*new format*/))) + if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */))) { DBUG_ASSERT(thd==rli->sql_thd); if (hot_log) pthread_mutex_unlock(log_lock); pthread_mutex_unlock(&rli->data_lock); - return ev; + DBUG_RETURN(ev); } DBUG_ASSERT(thd==rli->sql_thd); - if (opt_reckless_slave) + if (opt_reckless_slave) // For mysql-test cur_log->error = 0; - if ( cur_log->error < 0) + if (cur_log->error < 0) { errmsg = "slave SQL thread aborted because of I/O error"; + if (hot_log) + pthread_mutex_unlock(log_lock); goto err; } - - if (!cur_log->error) /* EOF */ { /* @@ -2520,7 +2573,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) */ if (hot_log) { - DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count); + DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count); /* We can, and should release data_lock while we are waiting for update. If we do not, show slave status will block @@ -2528,7 +2581,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) pthread_mutex_unlock(&rli->data_lock); /* - IMPORTANT: note that wait_for_update will unlock LOCK_log, but + IMPORTANT: note that wait_for_update will unlock lock_log, but expects the caller to lock it */ rli->relay_log.wait_for_update(rli->sql_thd); @@ -2537,102 +2590,108 @@ Log_event* next_event(RELAY_LOG_INFO* rli) pthread_mutex_lock(&rli->data_lock); continue; } + /* + If the log was not hot, we need to move to the next log in + sequence. The next log could be hot or cold, we deal with both + cases separately after doing some common initialization + */ + end_io_cache(cur_log); + DBUG_ASSERT(rli->cur_log_fd >= 0); + my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + + /* + TODO: make skip_log_purge a start-up option. At this point this + is not critical priority + */ + if (!rli->skip_log_purge) + { + // purge_first_log will properly set up relay log coordinates in rli + if (rli->relay_log.purge_first_log(rli)) + { + errmsg = "Error purging processed log"; + goto err; + } + } else { /* - If the log was not hot, we need to move to the next log in - sequence. The next log could be hot or cold, we deal with both - cases separately after doing some common initialization + TODO: verify that no lock is ok here. At this point, if we + get this wrong, this is actually no big deal - the only time + this code will ever be executed is if we are recovering from + a bug when a full reload of the slave is not feasible or + desirable. */ - end_io_cache(cur_log); - DBUG_ASSERT(rli->cur_log_fd >= 0); - my_close(rli->cur_log_fd, MYF(MY_WME)); - rli->cur_log_fd = -1; - - // TODO: make skip_log_purge a start-up option. At this point this - // is not critical priority - if (!rli->skip_log_purge) - { - // purge_first_log will properly set up relay log coordinates in rli - if (rli->relay_log.purge_first_log(rli)) - { - errmsg = "Error purging processed log"; - goto err; - } - } - else + if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/)) { - // TODO: verify that no lock is ok here. At this point, if we - // get this wrong, this is actually no big deal - the only time - // this code will ever be executed is if we are recovering from - // a bug when a full reload of the slave is not feasible or - // desirable. - if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/)) - { - errmsg = "error switching to the next log"; - goto err; - } - rli->relay_log_pos = 4; - rli->pending=0; - strnmov(rli->relay_log_name,rli->linfo.log_file_name, - sizeof(rli->relay_log_name)); - flush_relay_log_info(rli); + errmsg = "error switching to the next log"; + goto err; } + rli->relay_log_pos = BIN_LOG_HEADER_SIZE; + rli->pending=0; + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + flush_relay_log_info(rli); + } - // next log is hot - if (rli->relay_log.is_active(rli->linfo.log_file_name)) - { + // next log is hot + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { #ifdef EXTRA_DEBUG - sql_print_error("next log '%s' is currently active", - rli->linfo.log_file_name); + sql_print_error("next log '%s' is currently active", + rli->linfo.log_file_name); #endif - rli->cur_log = cur_log = rli->relay_log.get_log_file(); - rli->cur_log_init_count = cur_log->init_count; - DBUG_ASSERT(rli->cur_log_fd == -1); + rli->cur_log= cur_log= rli->relay_log.get_log_file(); + rli->cur_log_old_open_count= rli->relay_log.get_open_count(); + DBUG_ASSERT(rli->cur_log_fd == -1); - /* - Read pointer has to be at the start since we are the only - reader - */ - if (check_binlog_magic(cur_log,&errmsg)) - goto err; - continue; - } /* - if we get here, the log was not hot, so we will have to - open it ourselves + Read pointer has to be at the start since we are the only + reader */ -#ifdef EXTRA_DEBUG - sql_print_error("next log '%s' is not active", - rli->linfo.log_file_name); -#endif - // open_binlog() will check the magic header - if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, - &errmsg)) <0) + if (check_binlog_magic(cur_log,&errmsg)) goto err; + continue; } + /* + if we get here, the log was not hot, so we will have to + open it ourselves + */ +#ifdef EXTRA_DEBUG + sql_print_error("next log '%s' is not active", + rli->linfo.log_file_name); +#endif + // open_binlog() will check the magic header + if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, + &errmsg)) <0) + goto err; } - else // read failed with a non-EOF error + else { - // TODO: come up with something better to handle this error + /* + Read failed with a non-EOF error. + TODO: come up with something better to handle this error + */ + if (hot_log) + pthread_mutex_unlock(log_lock); sql_print_error("Slave SQL thread: I/O error reading \ -event(errno=%d,cur_log->error=%d)", +event(errno: %d cur_log->error: %d)", my_errno,cur_log->error); // set read position to the beginning of the event my_b_seek(cur_log,rli->relay_log_pos+rli->pending); /* otherwise, we have had a partial read */ - /* TODO; see if there is a way to do this without this goto */ errmsg = "Aborting slave SQL thread because of partial event read"; + /* TODO; see if there is a way to do this without this goto */ goto err; } - } if (!errmsg && was_killed) errmsg = "slave SQL thread was killed"; + err: pthread_mutex_unlock(&rli->data_lock); sql_print_error("Error reading relay log event: %s", errmsg); - return 0; + DBUG_RETURN(0); } diff --git a/sql/slave.h b/sql/slave.h index 16735891815..d6992a8b839 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -84,18 +84,9 @@ typedef struct st_relay_log_info volatile my_off_t master_log_pos; /* - current offset in the relay log. - pending - in some cases we do not increment offset immediately after - processing an event, because the following event needs to be processed - atomically together with this one ( so far, there is only one type of - such event - Intvar_event that sets auto_increment value). However, once - both events have been processed, we need to increment by the cumulative - offset. pending stored the extra offset to be added to the position. + Protected with internal locks. + Must get data_lock when resetting the logs. */ - ulonglong relay_log_pos, pending; - - // protected with internal locks - // must get data_lock when resetting the logs MYSQL_LOG relay_log; LOG_INFO linfo; IO_CACHE cache_buf,*cur_log; @@ -125,9 +116,6 @@ typedef struct st_relay_log_info */ pthread_cond_t start_cond, stop_cond, data_cond; - // if not set, the value of other members of the structure are undefined - bool inited; - // parent master info structure struct st_master_info *mi; @@ -135,9 +123,19 @@ typedef struct st_relay_log_info Needed to deal properly with cur_log getting closed and re-opened with a different log under our feet */ - int cur_log_init_count; + uint32 cur_log_old_open_count; - volatile bool abort_slave, slave_running; + /* + current offset in the relay log. + pending - in some cases we do not increment offset immediately after + processing an event, because the following event needs to be processed + atomically together with this one ( so far, there is only one type of + such event - Intvar_event that sets auto_increment value). However, once + both events have been processed, we need to increment by the cumulative + offset. pending stored the extra offset to be added to the position. + */ + ulonglong relay_log_pos, pending; + ulonglong log_space_limit,log_space_total; /* Needed for problems when slave stops and we want to restart it @@ -145,45 +143,47 @@ typedef struct st_relay_log_info errors, and have been manually applied by DBA already. */ volatile uint32 slave_skip_counter; + pthread_mutex_t log_space_lock; + pthread_cond_t log_space_cond; + THD * sql_thd; + int last_slave_errno; #ifndef DBUG_OFF int events_till_abort; #endif - int last_slave_errno; char last_slave_error[MAX_SLAVE_ERRMSG]; - THD* sql_thd; + + // if not set, the value of other members of the structure are undefined + bool inited; + volatile bool abort_slave, slave_running; bool log_pos_current; bool abort_pos_wait; bool skip_log_purge; - ulonglong log_space_limit,log_space_total; - pthread_mutex_t log_space_lock; - pthread_cond_t log_space_cond; - st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0), - cur_log_init_count(0), - abort_slave(0),slave_running(0), - log_pos_current(0),abort_pos_wait(0), - skip_log_purge(0) - { - relay_log_name[0] = master_log_name[0] = 0; - bzero(&info_file,sizeof(info_file)); - pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); - pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST); - pthread_cond_init(&data_cond, NULL); - pthread_cond_init(&start_cond, NULL); - pthread_cond_init(&stop_cond, NULL); - pthread_cond_init(&log_space_cond, NULL); - } + st_relay_log_info() + :info_fd(-1),cur_log_fd(-1), cur_log_old_open_count(0), + inited(0), abort_slave(0), slave_running(0), log_pos_current(0), + abort_pos_wait(0), skip_log_purge(0) + { + relay_log_name[0] = master_log_name[0] = 0; + bzero(&info_file,sizeof(info_file)); + pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); + pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST); + pthread_cond_init(&data_cond, NULL); + pthread_cond_init(&start_cond, NULL); + pthread_cond_init(&stop_cond, NULL); + pthread_cond_init(&log_space_cond, NULL); + } ~st_relay_log_info() - { - pthread_mutex_destroy(&run_lock); - pthread_mutex_destroy(&data_lock); - pthread_mutex_destroy(&log_space_lock); - pthread_cond_destroy(&data_cond); - pthread_cond_destroy(&start_cond); - pthread_cond_destroy(&stop_cond); - pthread_cond_destroy(&log_space_cond); - } + { + pthread_mutex_destroy(&run_lock); + pthread_mutex_destroy(&data_lock); + pthread_mutex_destroy(&log_space_lock); + pthread_cond_destroy(&data_cond); + pthread_cond_destroy(&start_cond); + pthread_cond_destroy(&stop_cond); + pthread_cond_destroy(&log_space_cond); + } inline void inc_pending(ulonglong val) { pending += val; @@ -215,40 +215,33 @@ typedef struct st_relay_log_info int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos); } RELAY_LOG_INFO; -/* - repopen_relay_log() is called when we notice that the current "hot" log - got rotated under our feet -*/ - -IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg); Log_event* next_event(RELAY_LOG_INFO* rli); - /* st_master_info contains information about how to connect to a master, - current master log name, and current log offset, as well as misc - control variables + current master log name, and current log offset, as well as misc + control variables - st_master_info is initialized once from the master.info file if such - exists. Otherwise, data members corresponding to master.info fields are - initialized with defaults specified by master-* options. The initialization - is done through init_master_info() call. + st_master_info is initialized once from the master.info file if such + exists. Otherwise, data members corresponding to master.info fields + are initialized with defaults specified by master-* options. The + initialization is done through init_master_info() call. - The format of master.info file: + The format of master.info file: - log_name - log_pos - master_host - master_user - master_pass - master_port - master_connect_retry + log_name + log_pos + master_host + master_user + master_pass + master_port + master_connect_retry - To write out the contents of master.info file to disk ( needed every - time we read and queue data from the master ), a call to - flush_master_info() is required. + To write out the contents of master.info file to disk ( needed every + time we read and queue data from the master ), a call to + flush_master_info() is required. - To clean up, call end_master_info() + To clean up, call end_master_info() */ diff --git a/sql/sql_class.h b/sql/sql_class.h index fe6a7e2ed69..da0b2090c97 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -60,48 +60,49 @@ class Log_event; class MYSQL_LOG { private: pthread_mutex_t LOCK_log, LOCK_index; + pthread_cond_t update_cond; + ulonglong bytes_written; time_t last_time,query_start; IO_CACHE log_file; File index_file; char *name; - volatile enum_log_type log_type; char time_buff[20],db[NAME_LEN+1]; char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN]; - bool write_error,inited; - uint file_id; // current file sequence number for load data infile - // binary logging - bool no_rotate; // for binlog - if log name can never change - // we should not try to rotate it or write any rotation events - // the user should use FLUSH MASTER instead of FLUSH LOGS for - // purging + // current file sequence number for load data infile binary logging + uint file_id; + uint open_count; // For replication + /* + For binlog - if log name can never change we should not try to rotate it + or write any rotation events. The user should use FLUSH MASTER instead + of FLUSH LOGS for purging. + */ + volatile enum_log_type log_type; enum cache_type io_cache_type; + bool write_error,inited; + bool no_rotate; bool need_start_event; - pthread_cond_t update_cond; bool no_auto_events; // for relay binlog - ulonglong bytes_written; friend class Log_event; public: MYSQL_LOG(); ~MYSQL_LOG(); - pthread_mutex_t* get_log_lock() { return &LOCK_log; } void reset_bytes_written() - { - bytes_written = 0; - } + { + bytes_written = 0; + } void harvest_bytes_written(ulonglong* counter) - { + { #ifndef DBUG_OFF - char buf1[22],buf2[22]; + char buf1[22],buf2[22]; #endif - DBUG_ENTER("harvest_bytes_written"); - (*counter)+=bytes_written; - DBUG_PRINT("info",("counter=%s,bytes_written=%s", llstr(*counter,buf1), - llstr(bytes_written,buf2))); - bytes_written=0; - DBUG_VOID_RETURN; - } - IO_CACHE* get_log_file() { return &log_file; } + DBUG_ENTER("harvest_bytes_written"); + (*counter)+=bytes_written; + DBUG_PRINT("info",("counter: %s bytes_written: %s", llstr(*counter,buf1), + llstr(bytes_written,buf2))); + bytes_written=0; + DBUG_VOID_RETURN; + } void signal_update() { pthread_cond_broadcast(&update_cond);} void wait_for_update(THD* thd); void set_need_start_event() { need_start_event = 1; } @@ -135,8 +136,8 @@ public: int purge_logs(THD* thd, const char* to_log); int purge_first_log(struct st_relay_log_info* rli); int reset_logs(THD* thd); - void close(bool exiting = 0); // if we are exiting, we also want to close the - // index file + // if we are exiting, we also want to close the index file + void close(bool exiting = 0); // iterating through the log index file int find_first_log(LOG_INFO* linfo, const char* log_name, @@ -146,11 +147,15 @@ public: uint next_file_id(); inline bool is_open() { return log_type != LOG_CLOSED; } - char* get_index_fname() { return index_file_name;} - char* get_log_fname() { return log_file_name; } - void lock_index() { pthread_mutex_lock(&LOCK_index);} - void unlock_index() { pthread_mutex_unlock(&LOCK_index);} - File get_index_file() { return index_file;} + inline char* get_index_fname() { return index_file_name;} + inline char* get_log_fname() { return log_file_name; } + inline pthread_mutex_t* get_log_lock() { return &LOCK_log; } + inline IO_CACHE* get_log_file() { return &log_file; } + + inline void lock_index() { pthread_mutex_lock(&LOCK_index);} + inline void unlock_index() { pthread_mutex_unlock(&LOCK_index);} + inline File get_index_file() { return index_file;} + inline uint32 get_open_count() { return open_count; } }; /* character conversion tables */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 6253058549e..a92622a59b1 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2891,6 +2891,7 @@ bool add_field_to_list(char *field_name, enum_field_types type, case FIELD_TYPE_STRING: case FIELD_TYPE_VAR_STRING: case FIELD_TYPE_NULL: + case FIELD_TYPE_GEOMETRY: break; case FIELD_TYPE_DECIMAL: if (!length) diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index b6c7c98a4cf..415007b38fa 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -154,6 +154,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg) { File file; + DBUG_ENTER("open_binlog"); if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 || init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0, @@ -164,7 +165,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, } if (check_binlog_magic(log,errmsg)) goto err; - return file; + DBUG_RETURN(file); err: if (file >= 0) @@ -172,7 +173,7 @@ err: my_close(file,MYF(0)); end_io_cache(log); } - return -1; + DBUG_RETURN(-1); } @@ -628,7 +629,8 @@ int reset_slave(MASTER_INFO* mi) char fname[FN_REFLEN]; int restart_thread_mask = 0,error=0; const char* errmsg=0; - + DBUG_ENTER("reset_slave"); + lock_slave_threads(mi); init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */); if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/)) @@ -649,14 +651,14 @@ int reset_slave(MASTER_INFO* mi) goto err; } if (restart_thread_mask) - error=start_slave_threads(0 /* mutex not needed*/, - 1 /* wait for start*/, - mi,master_info_file,relay_log_info_file, - restart_thread_mask); + error=start_slave_threads(0 /* mutex not needed */, + 1 /* wait for start*/, + mi,master_info_file,relay_log_info_file, + restart_thread_mask); // TODO: fix error messages so they get to the client err: unlock_slave_threads(mi); - return error; + DBUG_RETURN(error); } void kill_zombie_dump_threads(uint32 slave_server_id) diff --git a/sql/unireg.h b/sql/unireg.h index 5a61f4a6c12..e8fdd5dd5dd 100644 --- a/sql/unireg.h +++ b/sql/unireg.h @@ -129,6 +129,10 @@ bfill((A)->null_flags,(A)->null_bytes,255);\ */ #define MIN_TURBOBM_PATTERN_LEN 3 +/* Defines for binary logging */ + +#define BIN_LOG_HEADER_SIZE 4 + /* Include prototypes for unireg */ #include "mysqld_error.h" |