diff options
-rw-r--r-- | client/mysqltest.c | 57 | ||||
-rw-r--r-- | include/my_sys.h | 5 | ||||
-rw-r--r-- | mysql-test/r/rpl000016.result | 3 | ||||
-rw-r--r-- | mysql-test/t/rpl000016.test | 4 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 22 | ||||
-rw-r--r-- | sql/log.cc | 25 | ||||
-rw-r--r-- | sql/log_event.cc | 64 | ||||
-rw-r--r-- | sql/slave.cc | 89 | ||||
-rw-r--r-- | sql/sql_class.h | 1 |
9 files changed, 212 insertions, 58 deletions
diff --git a/client/mysqltest.c b/client/mysqltest.c index 762589b0374..510fe6a3f4c 100644 --- a/client/mysqltest.c +++ b/client/mysqltest.c @@ -15,7 +15,8 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* mysqltest test tool - * See man page for more information. + * See the manual for more information + * TODO: document better how mysqltest works * * Written by: * Sasha Pachev <sasha@mysql.com> @@ -26,9 +27,6 @@ /********************************************************************** TODO: -- Print also the queries that returns a result to the log file; This makes - it much easier to find out what's wrong. - - Do comparison line by line, instead of doing a full comparison of the text file. This will save space as we don't need to keep many results in memory. It will also make it possible to do simple @@ -43,7 +41,7 @@ **********************************************************************/ -#define MTEST_VERSION "1.13" +#define MTEST_VERSION "1.14" #include <my_global.h> #include <mysql_embed.h> @@ -88,6 +86,12 @@ #define CON_RETRY_SLEEP 2 #define MAX_CON_TRIES 5 +#ifndef OS2 +#define SLAVE_POLL_INTERVAL 300000 /* 0.3 of a sec */ +#else +#defile SLAVE_POLL_INTERVAL 0.3 +#endif + enum {OPT_MANAGER_USER=256,OPT_MANAGER_HOST,OPT_MANAGER_PASSWD, OPT_MANAGER_PORT,OPT_MANAGER_WAIT_TIMEOUT}; @@ -187,6 +191,7 @@ Q_DISABLE_RPL_PARSE, Q_EVAL_RESULT, Q_ENABLE_QUERY_LOG, Q_DISABLE_QUERY_LOG, Q_ENABLE_RESULT_LOG, Q_DISABLE_RESULT_LOG, Q_SERVER_START, Q_SERVER_STOP,Q_REQUIRE_MANAGER, +Q_WAIT_FOR_SLAVE_TO_STOP, Q_UNKNOWN, /* Unknown command. */ Q_COMMENT, /* Comments, ignored. */ Q_COMMENT_WITH_COMMAND @@ -222,7 +227,7 @@ const char *command_names[] = { "enable_query_log", "disable_query_log", "enable_result_log", "disable_result_log", "server_start", "server_stop", - "require_manager", + "require_manager", "wait_for_slave_to_stop", 0 }; @@ -653,6 +658,45 @@ int open_file(const char* name) return 0; } +/* ugly long name, but we are following the convention */ +int do_wait_for_slave_to_stop(struct st_query* __attribute__((unused)) q) +{ + MYSQL* mysql = &cur_con->mysql; +#ifndef OS2 + struct timeval t; +#endif + for (;;) + { + MYSQL_RES* res; + MYSQL_ROW row; + int done; + LINT_INIT(res); + + if (mysql_query(mysql,"show status like 'Slave_running'") + || !(res=mysql_store_result(mysql))) + die("Query failed while probing slave for stop: %s", + mysql_error(mysql)); + if (!(row=mysql_fetch_row(res)) || !row[1]) + { + mysql_free_result(res); + die("Strange result from query while probing slave for stop"); + } + done = !strcmp(row[1],"OFF"); + mysql_free_result(res); + if (done) + break; +#ifndef OS2 + t.tv_sec=0; + t.tv_usec=SLAVE_POLL_INTERVAL; + select(0,0,0,0,&t); /* sleep */ +#else + DosSleep(OS2_SLAVE_POLL_INTERVAL); +#endif + } + + return 0; +} + int do_require_manager(struct st_query* __attribute__((unused)) q) { if (!manager) @@ -2335,6 +2379,7 @@ int main(int argc, char** argv) case Q_DISABLE_RESULT_LOG: disable_result_log=1; break; case Q_SOURCE: do_source(q); break; case Q_SLEEP: do_sleep(q); break; + case Q_WAIT_FOR_SLAVE_TO_STOP: do_wait_for_slave_to_stop(q); break; case Q_REQUIRE_MANAGER: do_require_manager(q); break; #ifndef EMBEDDED_LIBRARY case Q_SERVER_START: do_server_start(q); break; diff --git a/include/my_sys.h b/include/my_sys.h index f899e4f9324..1d7c0b7ddb1 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -643,7 +643,10 @@ extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count); extern int my_b_append(IO_CACHE *info,const byte *Buffer,uint Count); extern int my_block_write(IO_CACHE *info, const byte *Buffer, uint Count, my_off_t pos); -extern int flush_io_cache(IO_CACHE *info); +extern int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock); + +#define flush_io_cache(info) _flush_io_cache((info),1) + extern int end_io_cache(IO_CACHE *info); extern uint my_b_fill(IO_CACHE *info); extern void my_b_seek(IO_CACHE *info,my_off_t pos); diff --git a/mysql-test/r/rpl000016.result b/mysql-test/r/rpl000016.result index dae60021157..aab3635ea52 100644 --- a/mysql-test/r/rpl000016.result +++ b/mysql-test/r/rpl000016.result @@ -33,7 +33,6 @@ master-bin.003 insert into t2 values(1234); set insert_id=1234; insert into t2 values(NULL); -slave stop; set sql_slave_skip_counter=1; slave start; purge master logs to 'master-bin.003'; @@ -66,7 +65,7 @@ slave stop; slave start; show slave status; Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos -127.0.0.1 root MASTER_PORT 60 master-bin.006 445 mysql-relay-bin.004 1312 master-bin.006 Yes Yes 0 0 445 +127.0.0.1 root MASTER_PORT 60 master-bin.006 445 mysql-relay-bin.004 1376 master-bin.006 Yes Yes 0 0 445 lock tables t3 read; select count(*) from t3 where n >= 4; count(*) diff --git a/mysql-test/t/rpl000016.test b/mysql-test/t/rpl000016.test index 4e094650b2a..7559b2d3515 100644 --- a/mysql-test/t/rpl000016.test +++ b/mysql-test/t/rpl000016.test @@ -51,9 +51,7 @@ insert into t2 values(NULL); connection slave; sync_with_master; -#the slave may have already stopped, so we ignore the error ---error 0,1199 -!slave stop; +wait_for_slave_to_stop; #restart slave skipping one event set sql_slave_skip_counter=1; diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 6b0d83212c4..34de5dfd7f3 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -808,13 +808,19 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) Buffer+=rest_length; Count-=rest_length; info->write_pos+=rest_length; - if (flush_io_cache(info)) + if (_flush_io_cache(info,0)) + { + unlock_append_buffer(info); return 1; + } if (Count >= IO_SIZE) { /* Fill first intern buffer */ length=Count & (uint) ~(IO_SIZE-1); if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) + { + unlock_append_buffer(info); return info->error= -1; + } Count-=length; Buffer+=length; } @@ -883,14 +889,16 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, /* Flush write cache */ -int flush_io_cache(IO_CACHE *info) +int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock) { uint length; my_bool append_cache; my_off_t pos_in_file; DBUG_ENTER("flush_io_cache"); - append_cache = (info->type == SEQ_READ_APPEND); + if (!(append_cache = (info->type == SEQ_READ_APPEND))) + need_append_buffer_lock=0; + if (info->type == WRITE_CACHE || append_cache) { if (info->file == -1) @@ -898,6 +906,8 @@ int flush_io_cache(IO_CACHE *info) if (real_open_cached_file(info)) DBUG_RETURN((info->error= -1)); } + if (need_append_buffer_lock) + lock_append_buffer(info); if ((length=(uint) (info->write_pos - info->write_buffer))) { pos_in_file=info->pos_in_file; @@ -909,6 +919,8 @@ int flush_io_cache(IO_CACHE *info) if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR) { + if (need_append_buffer_lock) + unlock_append_buffer(info); DBUG_RETURN((info->error= -1)); } if (!append_cache) @@ -932,6 +944,8 @@ int flush_io_cache(IO_CACHE *info) info->end_of_file+=(info->write_pos-info->append_read_pos); info->append_read_pos=info->write_pos=info->write_buffer; + if (need_append_buffer_lock) + unlock_append_buffer(info); DBUG_RETURN(info->error); } } @@ -942,6 +956,8 @@ int flush_io_cache(IO_CACHE *info) info->inited=0; } #endif + if (need_append_buffer_lock) + unlock_append_buffer(info); DBUG_RETURN(0); } diff --git a/sql/log.cc b/sql/log.cc index ec7396bda3c..d3ad4564a73 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -703,12 +703,37 @@ void MYSQL_LOG::new_file(bool inside_mutex) } } +bool MYSQL_LOG::append(Log_event* ev) +{ + bool error = 0; + pthread_mutex_lock(&LOCK_log); + + DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); + // Log_event::write() is smart enough to use my_b_write() or + // my_b_append() depending on the kind of cache we have + if (ev->write(&log_file)) + { + error=1; + goto err; + } + if ((uint)my_b_append_tell(&log_file) > max_binlog_size) + { + new_file(1); + } + signal_update(); +err: + pthread_mutex_unlock(&LOCK_log); + return error; +} + bool MYSQL_LOG::appendv(const char* buf, uint len,...) { bool error = 0; va_list(args); va_start(args,len); + DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); + pthread_mutex_lock(&LOCK_log); do { diff --git a/sql/log_event.cc b/sql/log_event.cc index e10352e6c48..4d6d5341312 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -26,6 +26,18 @@ #include <assert.h> +inline int my_b_safe_write(IO_CACHE* file, const char* buf, + int len) +{ + // Sasha: We are not writing this with the ? operator to avoid hitting + // a possible compiler bug. At least gcc 2.95 cannot deal with + // several layers of ternary operators that evaluated comma(,) operator + // expressions inside - I do have a test case if somebody wants it + if (file->type == SEQ_READ_APPEND) + return my_b_append(file,buf,len); + return my_b_write(file,buf,len); + } + #ifdef MYSQL_CLIENT static void pretty_print_str(FILE* file, char* str, int len) { @@ -403,7 +415,7 @@ int Log_event::write_header(IO_CACHE* file) pos += 4; int2store(pos, flags); pos += 2; - return (my_b_write(file, (byte*) buf, (uint) (pos - buf))); + return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf))); } #ifndef MYSQL_CLIENT @@ -677,7 +689,7 @@ int Start_log_event::write_data(IO_CACHE* file) int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); int4store(buff + ST_CREATED_OFFSET,created); - return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); + return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); } Rotate_log_event::Rotate_log_event(const char* buf, int event_len, @@ -714,8 +726,8 @@ int Rotate_log_event::write_data(IO_CACHE* file) { char buf[ROTATE_HEADER_LEN]; int8store(buf, pos + R_POS_OFFSET); - return my_b_write(file, (byte*)buf, ROTATE_HEADER_LEN) || - my_b_write(file, (byte*)new_log_ident, (uint) ident_len); + return my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || + my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len); } #ifndef MYSQL_CLIENT @@ -812,9 +824,9 @@ int Query_log_event::write_data(IO_CACHE* file) buf[Q_DB_LEN_OFFSET] = (char)db_len; int2store(buf + Q_ERR_CODE_OFFSET, error_code); - return (my_b_write(file, (byte*) buf, QUERY_HEADER_LEN) || - my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || - my_b_write(file, (byte*) query, q_len)) ? -1 : 0; + return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) || + my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || + my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0; } Intvar_log_event::Intvar_log_event(const char* buf, bool old_format): @@ -840,7 +852,7 @@ int Intvar_log_event::write_data(IO_CACHE* file) char buf[9]; buf[I_TYPE_OFFSET] = type; int8store(buf + I_VAL_OFFSET, val); - return my_b_write(file, (byte*) buf, sizeof(buf)); + return my_b_safe_write(file, (byte*) buf, sizeof(buf)); } #ifdef MYSQL_CLIENT @@ -878,7 +890,7 @@ int Load_log_event::write_data_header(IO_CACHE* file) buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_DB_LEN_OFFSET] = (char)db_len; int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); - return my_b_write(file, (byte*)buf, LOAD_HEADER_LEN); + return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN); } int Load_log_event::write_data_body(IO_CACHE* file) @@ -886,20 +898,20 @@ int Load_log_event::write_data_body(IO_CACHE* file) if (sql_ex.write_data(file)) return 1; if (num_fields && fields && field_lens) { - if (my_b_write(file, (byte*)field_lens, num_fields) || - my_b_write(file, (byte*)fields, field_block_len)) + if (my_b_safe_write(file, (byte*)field_lens, num_fields) || + my_b_safe_write(file, (byte*)fields, field_block_len)) return 1; } - return (my_b_write(file, (byte*)table_name, table_name_len + 1) || - my_b_write(file, (byte*)db, db_len + 1) || - my_b_write(file, (byte*)fname, fname_len)); + return (my_b_safe_write(file, (byte*)table_name, table_name_len + 1) || + my_b_safe_write(file, (byte*)db, db_len + 1) || + my_b_safe_write(file, (byte*)fname, fname_len)); } static bool write_str(IO_CACHE *file, char *str, byte length) { - return (my_b_write(file, &length, 1) || - my_b_write(file, (byte*) str, (int) length)); + return (my_b_safe_write(file, &length, 1) || + my_b_safe_write(file, (byte*) str, (int) length)); } int sql_ex_info::write_data(IO_CACHE* file) @@ -911,7 +923,7 @@ int sql_ex_info::write_data(IO_CACHE* file) write_str(file, line_term, line_term_len) || write_str(file, line_start, line_start_len) || write_str(file, escaped, escaped_len) || - my_b_write(file,(byte*) &opt_flags,1)); + my_b_safe_write(file,(byte*) &opt_flags,1)); } else { @@ -923,7 +935,7 @@ int sql_ex_info::write_data(IO_CACHE* file) old_ex.escaped= *escaped; old_ex.opt_flags= opt_flags; old_ex.empty_flags=empty_flags; - return my_b_write(file, (byte*) &old_ex, sizeof(old_ex)); + return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex)); } } @@ -1280,7 +1292,7 @@ int Slave_log_event::write_data(IO_CACHE* file) int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos); int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port); // log and host are already there - return my_b_write(file, (byte*)mem_pool, get_data_size()); + return my_b_safe_write(file, (byte*)mem_pool, get_data_size()); } void Slave_log_event::init_from_mem_pool(int data_size) @@ -1330,8 +1342,8 @@ int Create_file_log_event::write_data_body(IO_CACHE* file) int res; if ((res = Load_log_event::write_data_body(file)) || fake_base) return res; - return (my_b_write(file, (byte*) "", 1) || - my_b_write(file, (byte*) block, block_len)); + return (my_b_safe_write(file, (byte*) "", 1) || + my_b_safe_write(file, (byte*) block, block_len)); } int Create_file_log_event::write_data_header(IO_CACHE* file) @@ -1341,7 +1353,7 @@ int Create_file_log_event::write_data_header(IO_CACHE* file) return res; byte buf[CREATE_FILE_HEADER_LEN]; int4store(buf + CF_FILE_ID_OFFSET, file_id); - return my_b_write(file, buf, CREATE_FILE_HEADER_LEN); + return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN); } int Create_file_log_event::write_base(IO_CACHE* file) @@ -1423,8 +1435,8 @@ int Append_block_log_event::write_data(IO_CACHE* file) { byte buf[APPEND_BLOCK_HEADER_LEN]; int4store(buf + AB_FILE_ID_OFFSET, file_id); - return (my_b_write(file, buf, APPEND_BLOCK_HEADER_LEN) || - my_b_write(file, (byte*) block, block_len)); + return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) || + my_b_safe_write(file, (byte*) block, block_len)); } #ifdef MYSQL_CLIENT @@ -1473,7 +1485,7 @@ int Delete_file_log_event::write_data(IO_CACHE* file) { byte buf[DELETE_FILE_HEADER_LEN]; int4store(buf + DF_FILE_ID_OFFSET, file_id); - return my_b_write(file, buf, DELETE_FILE_HEADER_LEN); + return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN); } #ifdef MYSQL_CLIENT @@ -1520,7 +1532,7 @@ int Execute_load_log_event::write_data(IO_CACHE* file) { byte buf[EXEC_LOAD_HEADER_LEN]; int4store(buf + EL_FILE_ID_OFFSET, file_id); - return my_b_write(file, buf, EXEC_LOAD_HEADER_LEN); + return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN); } #ifdef MYSQL_CLIENT diff --git a/sql/slave.cc b/sql/slave.cc index 3d38a7e6f4b..97efbf6036c 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -54,6 +54,9 @@ static int stuck_count = 0; typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; void skip_load_data_infile(NET* net); +static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev); +static int queue_old_event(MASTER_INFO* mi, const char* buf, + uint event_len); static inline bool slave_killed(THD* thd,MASTER_INFO* mi); static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli); static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); @@ -1918,34 +1921,86 @@ the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \ DBUG_RETURN(0); // Can't return anything here } +static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev) +{ + if (!rev->is_valid()) + return 1; + DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name)); + memcpy(mi->master_log_name,rev->new_log_ident, + rev->ident_len); + mi->master_log_name[rev->ident_len] = 0; + mi->master_log_pos = rev->pos; +#ifndef DBUG_OFF + /* if we do not do this, we will be getting the first + rotate event forever, so + we need to not disconnect after one + */ + if (disconnect_slave_event_count) + events_till_disconnect++; +#endif + return 0; +} + +static int queue_old_event(MASTER_INFO* mi, const char* buf, + uint event_len) +{ + const char* errmsg = 0; + bool inc_pos = 1; + Log_event* ev = Log_event::read_log_event(buf,event_len, &errmsg, + 1/*old format*/); + if (!ev) + { + sql_print_error("Read invalid event from master: '%s',\ + master could be corrupt but a more likely cause of this is a bug", + errmsg); + return 1; + } + ev->log_pos = mi->master_log_pos; + switch (ev->get_type_code()) + { + case ROTATE_EVENT: + if (process_io_rotate(mi,(Rotate_log_event*)ev)) + { + delete ev; + return 1; + } + inc_pos = 0; + break; + case LOAD_EVENT: + // TODO: actually process it + mi->master_log_pos += event_len; + return 0; + break; + default: + break; + } + if (mi->rli.relay_log.append(ev)) + { + delete ev; + return 1; + } + delete ev; + if (inc_pos) + mi->master_log_pos += event_len; + return 0; +} + int queue_event(MASTER_INFO* mi,const char* buf,uint event_len) { int error; bool inc_pos = 1; if (mi->old_format) - return 1; // TODO: deal with old format - + return queue_old_event(mi,buf,event_len); + // TODO: figure out if other events in addition to Rotate + // require special processing switch (buf[EVENT_TYPE_OFFSET]) { case ROTATE_EVENT: { Rotate_log_event rev(buf,event_len,0); - if (!rev.is_valid()) + if (process_io_rotate(mi,&rev)) return 1; - DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name)); - memcpy(mi->master_log_name,rev.new_log_ident, - rev.ident_len); - mi->master_log_name[rev.ident_len] = 0; - mi->master_log_pos = rev.pos; - inc_pos = 0; -#ifndef DBUG_OFF - /* if we do not do this, we will be getting the first - rotate event forever, so - we need to not disconnect after one - */ - if (disconnect_slave_event_count) - events_till_disconnect++; -#endif + inc_pos=0; break; } default: diff --git a/sql/sql_class.h b/sql/sql_class.h index a34975bc77e..1d8fa1554d3 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -108,6 +108,7 @@ public: //v stands for vector //invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0) bool appendv(const char* buf,uint len,...); + bool append(Log_event* ev); int generate_new_name(char *new_name,const char *old_name); void make_log_name(char* buf, const char* log_ident); |