diff options
-rw-r--r-- | .bzrignore | 2 | ||||
-rw-r--r-- | include/my_sys.h | 37 | ||||
-rw-r--r-- | mysql-test/mysql-test-run.sh | 27 | ||||
-rw-r--r-- | mysys/Makefile.am | 4 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 283 | ||||
-rw-r--r-- | sql/mysqld.cc | 77 | ||||
-rw-r--r-- | sql/sql_repl.cc | 18 | ||||
-rw-r--r-- | tools/mysqlmanager.c | 33 |
8 files changed, 409 insertions, 72 deletions
diff --git a/.bzrignore b/.bzrignore index 6113e896ea7..56094cd2e78 100644 --- a/.bzrignore +++ b/.bzrignore @@ -348,8 +348,10 @@ mysql.kdevprj mysql.proj mysqld.S mysqld.sym +mysys/#mf_iocache.c# mysys/test_charset mysys/test_dir +mysys/test_io_cache mysys/test_thr_alarm mysys/test_thr_lock mysys/testhash diff --git a/include/my_sys.h b/include/my_sys.h index f6d303a6ccb..f9df265df0f 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -293,6 +293,16 @@ typedef struct st_dynamic_string { struct st_io_cache; typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); +#ifdef THREAD +#define lock_append_buffer(info) \ + pthread_mutex_lock(&(info)->append_buffer_lock) +#define unlock_append_buffer(info) \ + pthread_mutex_unlock(&(info)->append_buffer_lock) +#else +#define lock_append_buffer(info) +#define unlock_append_buffer(info) +#endif + typedef struct st_io_cache /* Used when cacheing files */ { my_off_t pos_in_file,end_of_file; @@ -301,13 +311,15 @@ typedef struct st_io_cache /* Used when cacheing files */ that will use a buffer allocated somewhere else */ - byte *append_buffer, *append_pos, *append_end; + byte *append_buffer, *append_read_pos, *write_pos, *append_end, + *write_end; /* for append buffer used in READ_APPEND cache */ #ifdef THREAD pthread_mutex_t append_buffer_lock; /* need mutex copying from append buffer to read buffer */ #endif int (*read_function)(struct st_io_cache *,byte *,uint); + int (*write_function)(struct st_io_cache *,const byte *,uint); /* callbacks when the actual read I/O happens */ IO_CACHE_CALLBACK pre_read; IO_CACHE_CALLBACK post_read; @@ -342,16 +354,19 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); ((info)->rc_pos+=(Count)),0) :\ (*(info)->read_function)((info),Buffer,Count)) +#define my_b_write(info,Buffer,Count) \ + ((info)->write_pos + (Count) <=(info)->write_end ?\ + (memcpy((info)->write_pos, (Buffer), (size_t)(Count)),\ + ((info)->write_pos+=(Count)),0) : \ + (*(info)->write_function)((info),(Buffer),(Count))) + + + #define my_b_get(info) \ ((info)->rc_pos != (info)->rc_end ?\ ((info)->rc_pos++, (int) (uchar) (info)->rc_pos[-1]) :\ _my_b_get(info)) -#define my_b_write(info,Buffer,Count) \ - ((info)->rc_pos + (Count) <= (info)->rc_end ?\ - (memcpy((info)->rc_pos,Buffer,(size_t) (Count)), \ - ((info)->rc_pos+=(Count)),0) :\ - _my_b_write(info,Buffer,Count)) /* my_b_write_byte dosn't have any err-check */ #define my_b_write_byte(info,chr) \ @@ -564,6 +579,7 @@ extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_get(IO_CACHE *info); extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count); +extern int _my_b_append(IO_CACHE *info,const byte *Buffer,uint Count); extern int my_block_write(IO_CACHE *info, const byte *Buffer, uint Count, my_off_t pos); extern int flush_io_cache(IO_CACHE *info); @@ -634,6 +650,7 @@ byte *my_compress_alloc(const byte *packet, ulong *len, ulong *complen); ulong checksum(const byte *mem, uint count); uint my_bit_log2(ulong value); + #if defined(_MSC_VER) && !defined(__WIN__) extern void sleep(int sec); #endif @@ -646,3 +663,11 @@ extern my_bool have_tcpip; /* Is set if tcpip is used */ #endif #include "raid.h" #endif /* _my_sys_h */ + + + + + + + + diff --git a/mysql-test/mysql-test-run.sh b/mysql-test/mysql-test-run.sh index 57e5b5763f3..59d41605aa2 100644 --- a/mysql-test/mysql-test-run.sh +++ b/mysql-test/mysql-test-run.sh @@ -164,7 +164,10 @@ while test $# -gt 0; do --ssl-cert=$BASEDIR/SSL/server-cert.pem \ --ssl-key=$BASEDIR/SSL/server-key.pem" ;; --no-manager | --skip-manager) USE_MANAGER=0 ;; - --manager) USE_MANAGER=1 ;; + --manager) + USE_MANAGER=1 + USE_RUNNING_SERVER= + ;; --skip-innobase) EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT --skip-innobase" EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT --skip-innobase" ;; @@ -210,6 +213,7 @@ while test $# -gt 0; do --gdb ) START_WAIT_TIMEOUT=300 STOP_WAIT_TIMEOUT=300 + USE_MANAGER=1 if [ x$BINARY_DIST = x1 ] ; then $ECHO "Note: you will get more meaningful output on a source distribution compiled with debugging option when running tests with --gdb option" fi @@ -255,6 +259,8 @@ done #-- MYRUN_DIR=$MYSQL_TEST_DIR/var/run +MANAGER_PID_FILE="$MYRUN_DIR/manager.pid" + MASTER_MYDDIR="$MYSQL_TEST_DIR/var/master-data" MASTER_MYSOCK="$MYSQL_TMP_DIR/master.sock" MASTER_MYPID="$MYRUN_DIR/mysqld.pid" @@ -549,10 +555,20 @@ start_manager() return fi $ECHO "Starting MySQL Manager" + if [ -f "$MANAGER_PID_FILE" ] ; then + kill `cat $MANAGER_PID_FILE` + sleep 1 + if [ -f "$MANAGER_PID_FILE" ] ; then + kill -9 `cat $MANAGER_PID_FILE` + sleep 1 + fi + fi + + rm -f $MANAGER_PID_FILE MYSQL_MANAGER_PW=`$MYSQL_MANAGER_PWGEN -u $MYSQL_MANAGER_USER \ -o $MYSQL_MANAGER_PW_FILE` $MYSQL_MANAGER --log=$MYSQL_MANAGER_LOG --port=$MYSQL_MANAGER_PORT \ - --password-file=$MYSQL_MANAGER_PW_FILE + --password-file=$MYSQL_MANAGER_PW_FILE --pid-file=$MANAGER_PID_FILE abort_if_failed "Could not start MySQL manager" mysqltest_manager_args="--manager-host=localhost \ --manager-user=$MYSQL_MANAGER_USER \ @@ -561,7 +577,10 @@ start_manager() --manager-wait-timeout=$START_WAIT_TIMEOUT" MYSQL_TEST="$MYSQL_TEST $mysqltest_manager_args" MYSQL_TEST_ARGS="$MYSQL_TEST_ARGS $mysqltest_manager_args" - + while [ ! -f $MANAGER_PID_FILE ] ; do + sleep 1 + done + echo "Manager started" } stop_manager() @@ -573,6 +592,8 @@ stop_manager() -p$MYSQL_MANAGER_PW -P $MYSQL_MANAGER_PORT <<EOF shutdown EOF + echo "Manager terminated" + } manager_launch() diff --git a/mysys/Makefile.am b/mysys/Makefile.am index 1797d306254..d28ef1364c6 100644 --- a/mysys/Makefile.am +++ b/mysys/Makefile.am @@ -95,6 +95,10 @@ test_vsnprintf: my_vsnprintf.c $(LIBRARIES) $(CP) $(srcdir)/my_vsnprintf.c test_vsnprintf.c $(LINK) $(FLAGS) -DMAIN ./test_vsnprintf.c $(LDADD) $(LIBS) $(RM) -f test_vsnprintf.* +test_io_cache: mf_iocache.c $(LIBRARIES) + $(CP) $(srcdir)/mf_iocache.c test_io_cache.c + $(LINK) $(FLAGS) -DMAIN ./test_io_cache.c $(LDADD) $(LIBS) + $(RM) -f test_io_cache.* test_dir: test_dir.c $(LIBRARIES) $(LINK) $(FLAGS) -DMAIN $(srcdir)/test_dir.c $(LDADD) $(LIBS) diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index da040a11514..f9b668ce09f 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -41,7 +41,12 @@ static void my_aiowait(my_aio_result *result); #include <assert.h> #include <errno.h> +#ifdef MAIN +#include <my_dir.h> +#endif + static void init_read_function(IO_CACHE* info, enum cache_type type); +static void init_write_function(IO_CACHE* info, enum cache_type type); static void init_read_function(IO_CACHE* info, enum cache_type type) { @@ -65,6 +70,18 @@ static void init_read_function(IO_CACHE* info, enum cache_type type) } } +static void init_write_function(IO_CACHE* info, enum cache_type type) +{ + switch (type) + { + case SEQ_READ_APPEND: + info->write_function = _my_b_append; + break; + default: + info->write_function = _my_b_write; + } +} + /* ** if cachesize == 0 then use default cachesize (from s-file) ** if file == -1 then real_open_cached_file() will be called. @@ -87,6 +104,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, if (! (cachesize= my_default_record_cache_size)) DBUG_RETURN(1); /* No cache requested */ min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2; + info->alloced_buffer = 0; if (type == READ_CACHE || type == SEQ_READ_APPEND) { /* Assume file isn't growing */ if (cache_myflags & MY_DONT_CHECK_FILESIZE) @@ -113,7 +131,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, } } } - info->alloced_buffer = 0; if ((int) type < (int) READ_NET) { uint buffer_block; @@ -152,8 +169,9 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->rc_request_pos=info->rc_pos=info->buffer; if (type == SEQ_READ_APPEND) { - info->append_pos = info->append_buffer; - info->append_end = info->append_buffer + info->buffer_length; + info->append_read_pos = info->write_pos = info->append_buffer; + info->write_end = info->append_end = + info->append_buffer + info->buffer_length; #ifdef THREAD pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST); #endif @@ -166,7 +184,9 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, } else /* type == WRITE_CACHE */ { - info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); + info->write_end= + info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1)); + info->write_pos = info->buffer; } /* end_of_file may be changed by user later */ info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0 @@ -174,6 +194,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->type=type; info->error=0; init_read_function(info,type); + init_write_function(info,type); #ifdef HAVE_AIOWAIT if (use_async_io && ! my_disable_async_io) { @@ -234,16 +255,22 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, { /* use current buffer */ if (info->type == WRITE_CACHE && type == READ_CACHE) { - info->rc_end=info->rc_pos; + info->rc_end=info->write_pos; info->end_of_file=my_b_tell(info); } else if (type == WRITE_CACHE) { if (info->type == READ_CACHE) - info->rc_end=info->buffer+info->buffer_length; + { + info->write_end=info->buffer+info->buffer_length; + info->write_pos=info->rc_pos; + } info->end_of_file = ~(my_off_t) 0; } - info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); + if (type == WRITE_CACHE) + info->write_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); + else + info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file); #ifdef HAVE_AIOWAIT my_aiowait(&info->aio_result); /* Wait for outstanding req */ #endif @@ -277,9 +304,14 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ~(my_off_t) 0); } } + if (info->type == SEQ_READ_APPEND) + { + info->append_read_pos = info->write_pos = info->append_buffer; + } info->type=type; info->error=0; init_read_function(info,type); + init_write_function(info,type); #ifdef HAVE_AIOWAIT if (type != READ_NET) { @@ -294,7 +326,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, info->inited=0; #endif DBUG_RETURN(0); -} /* init_io_cache */ +} /* reinit_io_cache */ @@ -377,11 +409,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) return 0; } +/* Do sequential read from the SEQ_READ_APPEND cache + we do this in three stages: + - first read from info->buffer + - then if there are still data to read, try the file descriptor + - afterwards, if there are still data to read, try append buffer +*/ + int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) { - uint length,diff_length,left_length; + uint length,diff_length,left_length,save_count; my_off_t max_length, pos_in_file; - + save_count=Count; + /* first, read the regular buffer */ if ((left_length=(uint) (info->rc_end-info->rc_pos))) { dbug_assert(Count >= left_length); /* User is not using my_b_read() */ @@ -390,30 +430,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) Count-=left_length; } /* pos_in_file always point on where info->buffer was read */ - pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); + if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >= + info->end_of_file) + { + info->pos_in_file=pos_in_file; + goto read_append_buffer; + } /* no need to seek since the read is guaranteed to be sequential */ diff_length=(uint) (pos_in_file & (IO_SIZE-1)); -#ifdef THREAD - pthread_mutex_lock(&info->append_buffer_lock); -#endif -#ifdef THREAD - pthread_mutex_unlock(&info->append_buffer_lock); -#endif + + /* now the second stage begins - read from file descriptor */ if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) { /* Fill first intern buffer */ uint read_length; if (info->end_of_file == pos_in_file) { /* End of file */ - info->error=(int) left_length; - return 1; + goto read_append_buffer; } length=(Count & (uint) ~(IO_SIZE-1))-diff_length; if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) != (uint) length) { - info->error= read_length == (uint) -1 ? -1 : - (int) (read_length+left_length); - return 1; + if (read_length != (uint)-1) + { + Count -= read_length; + Buffer += read_length; + } + goto read_append_buffer; } Count-=length; Buffer+=length; @@ -422,15 +465,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) diff_length=0; } max_length=info->read_length-diff_length; - if (info->type != READ_FIFO && - (info->end_of_file - pos_in_file) < max_length) + if ((info->end_of_file - pos_in_file) < max_length) max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) { - info->error= left_length; /* We only got this many char */ - return 1; + goto read_append_buffer; } length=0; /* Didn't read any chars */ } @@ -439,15 +480,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) length == (uint) -1) { if (length != (uint) -1) + { memcpy(Buffer,info->buffer,(size_t) length); - info->error= length == (uint) -1 ? -1 : (int) (length+left_length); - return 1; + Count -= length; + Buffer += length; + } + goto read_append_buffer; } info->rc_pos=info->buffer+Count; info->rc_end=info->buffer+length; info->pos_in_file=pos_in_file; memcpy(Buffer,info->buffer,(size_t) Count); return 0; +read_append_buffer: + lock_append_buffer(info); + if (!Count) return 0; + { + uint copy_len = (uint)(info->append_read_pos - + info->write_pos); + dbug_assert(info->append_read_pos <= info->write_pos); + if (copy_len > Count) + copy_len = Count; + memcpy(Buffer, info->append_read_pos, + copy_len); + info->append_read_pos += copy_len; + Count -= copy_len; + if (Count) + info->error = save_count - Count; + } + unlock_append_buffer(info); + return Count ? 1 : 0; } #ifdef HAVE_AIOWAIT @@ -641,11 +703,11 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) { uint rest_length,length; - rest_length=(uint) (info->rc_end - info->rc_pos); - memcpy(info->rc_pos,Buffer,(size_t) rest_length); + rest_length=(uint) (info->write_end - info->write_pos); + memcpy(info->write_pos,Buffer,(size_t) rest_length); Buffer+=rest_length; Count-=rest_length; - info->rc_pos+=rest_length; + info->write_pos+=rest_length; if (info->pos_in_file+info->buffer_length > info->end_of_file) { my_errno=errno=EFBIG; @@ -667,8 +729,33 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) Buffer+=length; info->pos_in_file+=length; } - memcpy(info->rc_pos,Buffer,(size_t) Count); - info->rc_pos+=Count; + memcpy(info->write_pos,Buffer,(size_t) Count); + info->write_pos+=Count; + return 0; +} + +int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) +{ + uint rest_length,length; + + rest_length=(uint) (info->append_end - + info->write_pos); + memcpy(info->write_pos,Buffer,(size_t) rest_length); + Buffer+=rest_length; + Count-=rest_length; + info->write_pos+=rest_length; + if (flush_io_cache(info)) + return 1; + if (Count >= IO_SIZE) + { /* Fill first intern buffer */ + length=Count & (uint) ~(IO_SIZE-1); + if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) + return info->error= -1; + Count-=length; + Buffer+=length; + } + memcpy(info->write_pos,Buffer,(size_t) Count); + info->write_pos+=Count; return 0; } @@ -712,8 +799,8 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, Buffer+=length; Count-= length; /* Fix length of buffer if the new data was larger */ - if (info->buffer+length > info->rc_pos) - info->rc_pos=info->buffer+length; + if (info->buffer+length > info->write_pos) + info->write_pos=info->buffer+length; if (!Count) return (error); } @@ -723,37 +810,60 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, return error; } +/* avoid warning about empty if body */ +#ifdef THREAD +#define IF_APPEND_CACHE if (append_cache) +#else +#define IF_APPEND_CACHE +#endif + /* Flush write cache */ int flush_io_cache(IO_CACHE *info) { uint length; + int append_cache; DBUG_ENTER("flush_io_cache"); - - if (info->type == WRITE_CACHE) + append_cache = (info->type == SEQ_READ_APPEND); + if (info->type == WRITE_CACHE || append_cache) { if (info->file == -1) { if (real_open_cached_file(info)) DBUG_RETURN((info->error= -1)); } - if (info->rc_pos != info->buffer) + IF_APPEND_CACHE + lock_append_buffer(info); + if (info->write_pos != info->buffer) { - length=(uint) (info->rc_pos - info->buffer); + length=(uint) (info->write_pos - info->buffer); if (info->seek_not_done) { /* File touched, do seek */ if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) == MY_FILEPOS_ERROR) + { + IF_APPEND_CACHE + unlock_append_buffer(info); DBUG_RETURN((info->error= -1)); + } info->seek_not_done=0; } - info->rc_pos=info->buffer; + info->write_pos=info->buffer; info->pos_in_file+=length; - info->rc_end=(info->buffer+info->buffer_length- + info->write_end=(info->buffer+info->buffer_length- (info->pos_in_file & (IO_SIZE-1))); + if (append_cache) + { + info->append_read_pos = info->buffer; + info->append_end = info->write_end; + } if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) - DBUG_RETURN((info->error= -1)); - DBUG_RETURN(0); + info->error= -1; + else + info->error= 0; + IF_APPEND_CACHE + unlock_append_buffer(info); + DBUG_RETURN(info->error); } } #ifdef HAVE_AIOWAIT @@ -781,7 +891,94 @@ int end_io_cache(IO_CACHE *info) error=flush_io_cache(info); my_free((gptr) info->buffer,MYF(MY_WME)); info->buffer=info->rc_pos=(byte*) 0; + info->alloced_buffer = 0; } DBUG_RETURN(error); } /* end_io_cache */ +#ifdef MAIN +void die(const char* fmt, ...) +{ + va_list va_args; + va_start(va_args,fmt); + fprintf(stderr,"Error:"); + vfprintf(stderr, fmt,va_args); + fprintf(stderr,", errno=%d\n", errno); + exit(1); +} + +int open_file(const char* fname, IO_CACHE* info, int cache_size) +{ + int fd; + if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0) + die("Could not open %s", fname); + if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME))) + die("failed in init_io_cache()"); + return fd; +} + +void close_file(IO_CACHE* info) +{ + end_io_cache(info); + my_close(info->file, MYF(MY_WME)); +} + +int main(int argc, char** argv) +{ + IO_CACHE sra_cache; /* SEQ_READ_APPEND */ + MY_STAT status; + const char* fname="/tmp/iocache.test"; + int cache_size=16384; + char llstr_buf[22]; + int max_block,total_bytes=0; + int i,num_loops=100,error=0; + char *p; + char* block, *block_end; + MY_INIT(argv[0]); + max_block = cache_size*3; + if (!(block=(char*)my_malloc(max_block,MYF(MY_WME)))) + die("Not enough memory to allocate test block"); + block_end = block + max_block; + for (p = block,i=0; p < block_end;i++) + { + *p++ = (char)i; + } + if (my_stat(fname,&status, MYF(0)) && + my_delete(fname,MYF(MY_WME))) + { + die("Delete of %s failed, aborting", fname); + } + open_file(fname,&sra_cache, cache_size); + for (i = 0; i < num_loops; i++) + { + char buf[4]; + int block_size = abs(rand() % max_block); + int4store(buf, block_size); + if (my_b_write(&sra_cache,buf,4) || + my_b_write(&sra_cache, block, block_size)) + die("write failed"); + total_bytes += 4+block_size; + } + close_file(&sra_cache); + my_free(block,MYF(MY_WME)); + if (!my_stat(fname,&status,MYF(MY_WME))) + die("%s failed to stat, but I had just closed it,\ + wonder how that happened"); + printf("Final size of %s is %s, wrote %d bytes\n",fname, + llstr(status.st_size,llstr_buf), + total_bytes); + my_delete(fname, MYF(MY_WME)); + /* check correctness of tests */ + if (total_bytes != status.st_size) + { + fprintf(stderr,"Not the same number of bytes acutally in file as bytes \ +supposedly written\n"); + error=1; + } + exit(error); + return 0; +} +#endif + + + diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 45812754cad..c1724a4ad89 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -45,6 +45,11 @@ char pstack_file_name[80]; #endif /* __linux__ */ +#if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) +#define HAVE_CLOSE_SERVER_SOCK 1 +void close_server_sock(); +#endif + extern "C" { // Because of SCO 3.2V4.2 #include <errno.h> #include <sys/stat.h> @@ -453,16 +458,7 @@ static void close_connections(void) sql_print_error("Got error %d from pthread_cond_timedwait",error); #endif #if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) - if (ip_sock != INVALID_SOCKET) - { - DBUG_PRINT("error",("closing TCP/IP and socket files")); - VOID(shutdown(ip_sock,2)); - VOID(closesocket(ip_sock)); - VOID(shutdown(unix_sock,2)); - VOID(closesocket(unix_sock)); - VOID(unlink(mysql_unix_port)); - ip_sock=unix_sock= INVALID_SOCKET; - } + close_server_sock(); #endif } (void) pthread_mutex_unlock(&LOCK_thread_count); @@ -577,10 +573,37 @@ static void close_connections(void) DBUG_VOID_RETURN; } +#ifdef HAVE_CLOSE_SERVER_SOCK +void close_server_sock() +{ + DBUG_ENTER("close_server_sock"); + if (ip_sock != INVALID_SOCKET) + { + DBUG_PRINT("info",("closing TCP/IP socket")); + VOID(shutdown(ip_sock,2)); + VOID(closesocket(ip_sock)); + ip_sock=INVALID_SOCKET; + } + if (unix_sock != INVALID_SOCKET) + { + DBUG_PRINT("info",("closing Unix socket")); + VOID(shutdown(unix_sock,2)); + VOID(closesocket(unix_sock)); + VOID(unlink(mysql_unix_port)); + unix_sock=INVALID_SOCKET; + } + DBUG_VOID_RETURN; +} +#endif + void kill_mysql(void) { DBUG_ENTER("kill_mysql"); +#ifdef SIGNALS_DONT_BREAK_READ + close_server_sock(); /* force accept to wake up */ +#endif + #if defined(__WIN__) { if (!SetEvent(hEventShutdown)) @@ -604,6 +627,7 @@ void kill_mysql(void) #endif DBUG_PRINT("quit",("After pthread_kill")); shutdown_in_progress=1; // Safety if kill didn't work + abort_loop=1; DBUG_VOID_RETURN; } @@ -683,9 +707,11 @@ void unireg_end(int signal_number __attribute__((unused))) void unireg_abort(int exit_code) { + DBUG_ENTER("unireg_abort"); if (exit_code) sql_print_error("Aborting\n"); clean_up(); /* purecov: inspected */ + DBUG_PRINT("quit",("done with cleanup in unireg_abort")); my_thread_end(); exit(exit_code); /* purecov: inspected */ } @@ -736,13 +762,15 @@ void clean_up(bool print_message) if (print_message && errmesg) sql_print_error(ER(ER_SHUTDOWN_COMPLETE),my_progname); x_free((gptr) my_errmsg[ERRMAPP]); /* Free messages */ - + DBUG_PRINT("quit", ("Error messages freed")); /* Tell main we are ready */ (void) pthread_mutex_lock(&LOCK_thread_count); + DBUG_PRINT("quit", ("got thread count lock")); ready_to_exit=1; /* do the broadcast inside the lock to ensure that my_end() is not called */ (void) pthread_cond_broadcast(&COND_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count); + DBUG_PRINT("quit", ("done with cleanup")); } /* clean_up */ @@ -2023,6 +2051,7 @@ The server will not act as a slave."); sql_print_error("Before Lock_thread_count"); #endif (void) pthread_mutex_lock(&LOCK_thread_count); + DBUG_PRINT("quit", ("Got thread_count mutex")); select_thread_in_use=0; // For close_connections (void) pthread_cond_broadcast(&COND_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count); @@ -2054,10 +2083,14 @@ The server will not act as a slave."); #endif /* HAVE_OPENSSL */ /* Wait until cleanup is done */ (void) pthread_mutex_lock(&LOCK_thread_count); + DBUG_PRINT("quit", ("Got thread_count mutex for clean up wait")); + while (!ready_to_exit) { + DBUG_PRINT("quit", ("not yet ready to exit")); pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); } + DBUG_PRINT("quit", ("ready to exit")); (void) pthread_mutex_unlock(&LOCK_thread_count); my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); exit(0); @@ -2257,6 +2290,20 @@ static void create_new_thread(THD *thd) DBUG_VOID_RETURN; } +#ifdef SIGNALS_DONT_BREAK_READ +inline void kill_broken_server() +{ + /* hack to get around signals ignored in syscalls for problem OS's */ + if (unix_sock == INVALID_SOCKET || ip_sock ==INVALID_SOCKET) + { + select_thread_in_use = 0; + kill_server((void*)MYSQL_KILL_SIGNAL); /* never returns */ + } +} +#define MAYBE_BROKEN_SYSCALL kill_broken_server(); +#else +#define MAYBE_BROKEN_SYSCALL +#endif /* Handle new connections and spawn new process to handle them */ @@ -2292,6 +2339,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) #endif DBUG_PRINT("general",("Waiting for connections.")); + MAYBE_BROKEN_SYSCALL; while (!abort_loop) { readFDs=clientFDs; @@ -2306,12 +2354,15 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) if (!select_errors++ && !abort_loop) /* purecov: inspected */ sql_print_error("mysqld: Got error %d from select",socket_errno); /* purecov: inspected */ } + MAYBE_BROKEN_SYSCALL continue; } #endif /* HPUX */ if (abort_loop) + { + MAYBE_BROKEN_SYSCALL; break; - + } /* ** Is this a new connection request */ @@ -2347,6 +2398,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) if (new_sock != INVALID_SOCKET || (socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN)) break; + MAYBE_BROKEN_SYSCALL; #if !defined(NO_FCNTL_NONBLOCK) if (!(test_flags & TEST_BLOCKING)) { @@ -2363,6 +2415,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) { if ((error_count++ & 255) == 0) // This can happen often sql_perror("Error in accept"); + MAYBE_BROKEN_SYSCALL; if (socket_errno == SOCKET_ENFILE || socket_errno == SOCKET_EMFILE) sleep(1); // Give other threads some time continue; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index b5300813410..684c084ece3 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -38,6 +38,13 @@ bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif +#ifdef SIGNAL_WITH_VIO_CLOSE +#define KICK_SLAVE { slave_thd->close_active_vio(); \ + thr_alarm_kill(slave_real_id); } +#else +#define KICK_SLAVE thr_alarm_kill(slave_real_id); +#endif + static Slave_log_event* find_slave_event(IO_CACHE* log, const char* log_file_name, char* errmsg); @@ -700,10 +707,7 @@ int stop_slave(THD* thd, bool net_report ) if (slave_running) { abort_slave = 1; - thr_alarm_kill(slave_real_id); -#ifdef SIGNAL_WITH_VIO_CLOSE - slave_thd->close_active_vio(); -#endif + KICK_SLAVE; // do not abort the slave in the middle of a query, so we do not set // thd->killed for the slave thread thd->proc_info = "waiting for slave to die"; @@ -728,7 +732,7 @@ int stop_slave(THD* thd, bool net_report ) #endif pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime); if (slave_running) - thr_alarm_kill(slave_real_id); + KICK_SLAVE; } } else @@ -818,7 +822,7 @@ int change_master(THD* thd) if ((slave_was_running = slave_running)) { abort_slave = 1; - thr_alarm_kill(slave_real_id); + KICK_SLAVE; thd->proc_info = "waiting for slave to die"; while (slave_running) pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done @@ -1470,7 +1474,7 @@ int load_master_data(THD* thd) if ((slave_was_running = slave_running)) { abort_slave = 1; - thr_alarm_kill(slave_real_id); + KICK_SLAVE; thd->proc_info = "waiting for slave to die"; while (slave_running) pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done diff --git a/tools/mysqlmanager.c b/tools/mysqlmanager.c index 5783d151107..0795b468033 100644 --- a/tools/mysqlmanager.c +++ b/tools/mysqlmanager.c @@ -189,6 +189,8 @@ static void run_launcher_loop(); int to_launcher_pipe[2],from_launcher_pipe[2]; pid_t launcher_pid; int in_segfault=0; +const char* pid_file = "/var/run/mysqlmanager.pid"; +int created_pid_file = 0; struct manager_cmd { @@ -283,6 +285,7 @@ struct option long_options[] = {"one-thread",no_argument,0,'d'}, {"connect-retries",required_argument,0,'C'}, {"password-file",required_argument,0,'p'}, + {"pid-file",required_argument,0,'f'}, {"version", no_argument, 0, 'V'}, {0, 0, 0, 0} }; @@ -327,6 +330,17 @@ LOG_MSG_FUNC(log_debug,LOG_DEBUG) void log_debug(const char* __attribute__((unused)) fmt,...) {} #endif +static void handle_sigterm(int sig) +{ + log_info("Got SIGTERM"); + if (!one_thread) + { + kill(launcher_pid,SIGTERM); + pthread_kill(loop_th,SIGTERM); + } + clean_up(); + exit(0); +} static void handle_segfault(int sig) { @@ -1250,6 +1264,8 @@ static void clean_up() if (errfp != stderr) fclose(errfp); hash_free(&exec_hash); + if (created_pid_file) + my_delete(pid_file, MYF(0)); } static void print_version(void) @@ -1287,7 +1303,7 @@ static void usage() static int parse_args(int argc, char **argv) { int c, option_index = 0; - while ((c=getopt_long(argc,argv,"P:?#:Vl:b:B:g:m:dC:p:", + while ((c=getopt_long(argc,argv,"P:?#:Vl:b:B:g:m:dC:p:f:", long_options,&option_index)) != EOF) { switch (c) @@ -1301,6 +1317,9 @@ static int parse_args(int argc, char **argv) case 'p': manager_pw_file=optarg; break; + case 'f': + pid_file=optarg; + break; case 'C': manager_connect_retries=atoi(optarg); break; @@ -1662,6 +1681,16 @@ static void init_user_hash() fclose(f); } +static void init_pid_file() +{ + FILE* fp = fopen(pid_file, "w"); + if (!fp) + die("Could not open pid file %s", pid_file); + created_pid_file=1; + fprintf(fp, "%d\n", getpid()); + fclose(fp); +} + static void init_globals() { pthread_attr_t thr_attr; @@ -1680,8 +1709,10 @@ static void init_globals() /* (void) pthread_attr_destroy(&thr_attr); */ } init_user_hash(); + init_pid_file(); loop_th=pthread_self(); signal(SIGPIPE,handle_sigpipe); + signal(SIGTERM,handle_sigterm); } static int open_and_dup(int fd,char* path) |