diff options
author | unknown <sasha@mysql.sashanet.com> | 2001-11-03 16:54:00 -0700 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2001-11-03 16:54:00 -0700 |
commit | cd825a19936d26735db0c1b4b251bd78617d0c2b (patch) | |
tree | 8abf01e56458b77f0e836d8f72481d71cc846b68 | |
parent | 368c6c1f30f44d1d028022a06dbcb9754815ee92 (diff) | |
download | mariadb-git-cd825a19936d26735db0c1b4b251bd78617d0c2b.tar.gz |
more work on IO_CACHE
portability fixes for systems with broken syscalls that do not interrupt on
a signal
temporary commit - will not be pushed, need to sync up
include/my_sys.h:
work on READ_APPEND cache
mysys/Makefile.am:
change to test IO_CACHE
mysys/mf_iocache.c:
work on READ_APPEND cache
BitKeeper/etc/ignore:
Added mysys/#mf_iocache.c# mysys/test_io_cache to the ignore list
sql/mysqld.cc:
make shutdown work on broken systems
sql/sql_repl.cc:
make sure slave can be stopped on broken systems in all cases, clean-up
-rw-r--r-- | .bzrignore | 2 | ||||
-rw-r--r-- | include/my_sys.h | 20 | ||||
-rw-r--r-- | mysys/Makefile.am | 4 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 217 | ||||
-rw-r--r-- | sql/mysqld.cc | 71 | ||||
-rw-r--r-- | sql/sql_repl.cc | 18 |
6 files changed, 290 insertions, 42 deletions
diff --git a/.bzrignore b/.bzrignore index 0703a1d2f25..902e691db14 100644 --- a/.bzrignore +++ b/.bzrignore @@ -424,3 +424,5 @@ vio/test-ssl vio/test-sslclient vio/test-sslserver vio/viotest-ssl +mysys/#mf_iocache.c# +mysys/test_io_cache diff --git a/include/my_sys.h b/include/my_sys.h index f6d303a6ccb..5174425006d 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -293,6 +293,16 @@ typedef struct st_dynamic_string { struct st_io_cache; typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); +#ifdef THREAD +#define lock_append_buffer(info) \ + pthread_mutex_lock(&(info)->append_buffer_lock) +#define unlock_append_buffer(info) \ + pthread_mutex_unlock(&(info)->append_buffer_lock) +#else +#define lock_append_buffer(info) +#define unlock_append_buffer(info) +#endif + typedef struct st_io_cache /* Used when cacheing files */ { my_off_t pos_in_file,end_of_file; @@ -301,7 +311,7 @@ typedef struct st_io_cache /* Used when cacheing files */ that will use a buffer allocated somewhere else */ - byte *append_buffer, *append_pos, *append_end; + byte *append_buffer, *append_read_pos, *append_write_pos, *append_end; /* for append buffer used in READ_APPEND cache */ #ifdef THREAD pthread_mutex_t append_buffer_lock; @@ -348,10 +358,15 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); _my_b_get(info)) #define my_b_write(info,Buffer,Count) \ + ((info)->type != SEQ_READ_APPEND) ? (\ ((info)->rc_pos + (Count) <= (info)->rc_end ?\ (memcpy((info)->rc_pos,Buffer,(size_t) (Count)), \ ((info)->rc_pos+=(Count)),0) :\ - _my_b_write(info,Buffer,Count)) + _my_b_write(info,Buffer,Count))) : \ + ((info)->append_write_pos + (Count) <= (info)->append_end ?\ + (memcpy((info)->append_write_pos,Buffer,(size_t)Count), \ + ((info)->append_write_pos+=(Count),0)) : \ + _my_b_append(info,Buffer,Count)) /* my_b_write_byte dosn't have any err-check */ #define my_b_write_byte(info,chr) \ @@ -564,6 +579,7 @@ extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_get(IO_CACHE *info); extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count); +extern int _my_b_append(IO_CACHE *info,const byte *Buffer,uint Count); extern int my_block_write(IO_CACHE *info, const byte *Buffer, uint Count, my_off_t pos); extern int flush_io_cache(IO_CACHE *info); diff --git a/mysys/Makefile.am b/mysys/Makefile.am index 1797d306254..d28ef1364c6 100644 --- a/mysys/Makefile.am +++ b/mysys/Makefile.am @@ -95,6 +95,10 @@ test_vsnprintf: my_vsnprintf.c $(LIBRARIES) $(CP) $(srcdir)/my_vsnprintf.c test_vsnprintf.c $(LINK) $(FLAGS) -DMAIN ./test_vsnprintf.c $(LDADD) $(LIBS) $(RM) -f test_vsnprintf.* +test_io_cache: mf_iocache.c $(LIBRARIES) + $(CP) $(srcdir)/mf_iocache.c test_io_cache.c + $(LINK) $(FLAGS) -DMAIN ./test_io_cache.c $(LDADD) $(LIBS) + $(RM) -f test_io_cache.* test_dir: test_dir.c $(LIBRARIES) $(LINK) $(FLAGS) -DMAIN $(srcdir)/test_dir.c $(LDADD) $(LIBS) diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index f0100b5122b..d18c3a51c5d 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -41,6 +41,10 @@ static void my_aiowait(my_aio_result *result); #include <assert.h> #include <errno.h> +#ifdef MAIN +#include <my_dir.h> +#endif + static void init_read_function(IO_CACHE* info, enum cache_type type); static void init_read_function(IO_CACHE* info, enum cache_type type) @@ -152,7 +156,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->rc_request_pos=info->rc_pos=info->buffer; if (type == SEQ_READ_APPEND) { - info->append_pos = info->append_buffer; + info->append_read_pos = info->append_write_pos = info->append_buffer; info->append_end = info->append_buffer + info->buffer_length; #ifdef THREAD pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST); @@ -277,6 +281,10 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ~(my_off_t) 0); } } + if (info->type == SEQ_READ_APPEND) + { + info->append_read_pos = info->append_write_pos = info->append_buffer; + } info->type=type; info->error=0; init_read_function(info,type); @@ -294,7 +302,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, info->inited=0; #endif DBUG_RETURN(0); -} /* init_io_cache */ +} /* reinit_io_cache */ @@ -377,11 +385,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) return 0; } +/* Do sequential read from the SEQ_READ_APPEND cache + we do this in three stages: + - first read from info->buffer + - then if there are still data to read, try the file descriptor + - afterwards, if there are still data to read, try append buffer +*/ + int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) { - uint length,diff_length,left_length; + uint length,diff_length,left_length,save_count; my_off_t max_length, pos_in_file; - + save_count=Count; + /* first, read the regular buffer */ if ((left_length=(uint) (info->rc_end-info->rc_pos))) { dbug_assert(Count >= left_length); /* User is not using my_b_read() */ @@ -390,30 +406,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) Count-=left_length; } /* pos_in_file always point on where info->buffer was read */ - pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); + if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >= + info->end_of_file) + { + info->pos_in_file=pos_in_file; + goto read_append_buffer; + } /* no need to seek since the read is guaranteed to be sequential */ diff_length=(uint) (pos_in_file & (IO_SIZE-1)); -#ifdef THREAD - pthread_mutex_lock(&info->append_buffer_lock); -#endif -#ifdef THREAD - pthread_mutex_unlock(&info->append_buffer_lock); -#endif + + /* now the second stage begins - read from file descriptor */ if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) { /* Fill first intern buffer */ uint read_length; if (info->end_of_file == pos_in_file) { /* End of file */ - info->error=(int) left_length; - return 1; + goto read_append_buffer; } length=(Count & (uint) ~(IO_SIZE-1))-diff_length; if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) != (uint) length) { - info->error= read_length == (uint) -1 ? -1 : - (int) (read_length+left_length); - return 1; + if (read_length != (uint)-1) + { + Count -= read_length; + Buffer += read_length; + } + goto read_append_buffer; } Count-=length; Buffer+=length; @@ -422,15 +441,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) diff_length=0; } max_length=info->read_length-diff_length; - if (info->type != READ_FIFO && - (info->end_of_file - pos_in_file) < max_length) + if ((info->end_of_file - pos_in_file) < max_length) max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) { - info->error= left_length; /* We only got this many char */ - return 1; + goto read_append_buffer; } length=0; /* Didn't read any chars */ } @@ -439,15 +456,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) length == (uint) -1) { if (length != (uint) -1) + { memcpy(Buffer,info->buffer,(size_t) length); - info->error= length == (uint) -1 ? -1 : (int) (length+left_length); - return 1; + Count -= length; + Buffer += length; + } + goto read_append_buffer; } info->rc_pos=info->buffer+Count; info->rc_end=info->buffer+length; info->pos_in_file=pos_in_file; memcpy(Buffer,info->buffer,(size_t) Count); return 0; +read_append_buffer: + lock_append_buffer(info); + if (!Count) return 0; + { + uint copy_len = (uint)(info->append_read_pos - + info->append_write_pos); + dbug_assert(info->append_read_pos <= info->append_write_pos); + if (copy_len > Count) + copy_len = Count; + memcpy(Buffer, info->append_read_pos, + copy_len); + info->append_read_pos += copy_len; + Count -= copy_len; + if (Count) + info->error = save_count - Count; + } + unlock_append_buffer(info); + return Count ? 1 : 0; } #ifdef HAVE_AIOWAIT @@ -672,6 +710,31 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) return 0; } +int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) +{ + uint rest_length,length; + + rest_length=(uint) (info->append_end - + info->append_write_pos); + memcpy(info->append_write_pos,Buffer,(size_t) rest_length); + Buffer+=rest_length; + Count-=rest_length; + info->append_write_pos+=rest_length; + if (flush_io_cache(info)) + return 1; + if (Count >= IO_SIZE) + { /* Fill first intern buffer */ + length=Count & (uint) ~(IO_SIZE-1); + if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) + return info->error= -1; + Count-=length; + Buffer+=length; + } + memcpy(info->append_write_pos,Buffer,(size_t) Count); + info->append_write_pos+=Count; + return 0; +} + /* Write a block to disk where part of the data may be inside the record @@ -756,6 +819,30 @@ int flush_io_cache(IO_CACHE *info) DBUG_RETURN(0); } } + else if (info->type == SEQ_READ_APPEND) + { + if (info->file == -1) + { + if (real_open_cached_file(info)) + DBUG_RETURN((info->error= -1)); + } + lock_append_buffer(info); + if (info->append_write_pos != info->append_buffer) + { + length=(uint) (info->append_write_pos - info->append_buffer); + info->append_read_pos=info->append_write_pos=info->append_buffer; + info->append_end=(info->append_buffer+info->buffer_length- + (info->pos_in_file & (IO_SIZE-1))); + if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) + { + unlock_append_buffer(info); + DBUG_RETURN((info->error= -1)); + } + unlock_append_buffer(info); + DBUG_RETURN(0); + } + unlock_append_buffer(info); + } #ifdef HAVE_AIOWAIT else if (info->type != READ_NET) { @@ -784,3 +871,89 @@ int end_io_cache(IO_CACHE *info) DBUG_RETURN(error); } /* end_io_cache */ +#ifdef MAIN +void die(const char* fmt, ...) +{ + va_list va_args; + va_start(va_args,fmt); + fprintf(stderr,"Error:"); + vfprintf(stderr, fmt,va_args); + fprintf(stderr,", errno=%d\n", errno); + exit(1); +} + +int open_file(const char* fname, IO_CACHE* info, int cache_size) +{ + int fd; + if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0) + die("Could not open %s", fname); + if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME))) + die("failed in init_io_cache()"); + return fd; +} + +void close_file(IO_CACHE* info) +{ + end_io_cache(info); + my_close(info->file, MYF(MY_WME)); +} + +int main(int argc, char** argv) +{ + IO_CACHE sra_cache; /* SEQ_READ_APPEND */ + MY_STAT status; + const char* fname="/tmp/iocache.test"; + int cache_size=16384; + char llstr_buf[22]; + int max_block,total_bytes=0; + int i,num_loops=100,error=0; + char *p; + char* block, *block_end; + MY_INIT(argv[0]); + max_block = cache_size*3; + if (!(block=(char*)my_malloc(max_block,MYF(MY_WME)))) + die("Not enough memory to allocate test block"); + block_end = block + max_block; + for (p = block,i=0; p < block_end;i++) + { + *p++ = (char)i; + } + if (my_stat(fname,&status, MYF(0)) && + my_delete(fname,MYF(MY_WME))) + { + die("Delete of %s failed, aborting", fname); + } + open_file(fname,&sra_cache, cache_size); + for (i = 0; i < num_loops; i++) + { + char buf[4]; + int block_size = abs(rand() % max_block); + int4store(buf, block_size); + if (my_b_write(&sra_cache,buf,4) || + my_b_write(&sra_cache, block, block_size)) + die("write failed"); + total_bytes += 4+block_size; + } + close_file(&sra_cache); + my_free(block,MYF(MY_WME)); + if (!my_stat(fname,&status,MYF(MY_WME))) + die("%s failed to stat, but I had just closed it,\ + wonder how that happened"); + printf("Final size of %s is %s, wrote %d bytes\n",fname, + llstr(status.st_size,llstr_buf), + total_bytes); + my_delete(fname, MYF(MY_WME)); + /* check correctness of tests */ + if (total_bytes != status.st_size) + { + fprintf(stderr,"Not the same number of bytes acutally in file as bytes \ +supposedly written\n"); + error=1; + } + exit(error); + return 0; +} +#endif + + + diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 9a87acb5596..f92d25e3615 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -45,6 +45,11 @@ char pstack_file_name[80]; #endif /* __linux__ */ +#if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) +#define HAVE_CLOSE_SERVER_SOCK 1 +void close_server_sock(); +#endif + extern "C" { // Because of SCO 3.2V4.2 #include <errno.h> #include <sys/stat.h> @@ -453,16 +458,7 @@ static void close_connections(void) sql_print_error("Got error %d from pthread_cond_timedwait",error); #endif #if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) - if (ip_sock != INVALID_SOCKET) - { - DBUG_PRINT("error",("closing TCP/IP and socket files")); - VOID(shutdown(ip_sock,2)); - VOID(closesocket(ip_sock)); - VOID(shutdown(unix_sock,2)); - VOID(closesocket(unix_sock)); - VOID(unlink(mysql_unix_port)); - ip_sock=unix_sock= INVALID_SOCKET; - } + close_server_sock(); #endif } (void) pthread_mutex_unlock(&LOCK_thread_count); @@ -577,10 +573,37 @@ static void close_connections(void) DBUG_VOID_RETURN; } +#ifdef HAVE_CLOSE_SERVER_SOCK +void close_server_sock() +{ + DBUG_ENTER("close_server_sock"); + if (ip_sock != INVALID_SOCKET) + { + DBUG_PRINT("info",("closing TCP/IP socket")); + VOID(shutdown(ip_sock,2)); + VOID(closesocket(ip_sock)); + ip_sock=INVALID_SOCKET; + } + if (unix_sock != INVALID_SOCKET) + { + DBUG_PRINT("info",("closing Unix socket")); + VOID(shutdown(unix_sock,2)); + VOID(closesocket(unix_sock)); + VOID(unlink(mysql_unix_port)); + unix_sock=INVALID_SOCKET; + } + DBUG_VOID_RETURN; +} +#endif + void kill_mysql(void) { DBUG_ENTER("kill_mysql"); +#ifdef SIGNALS_DONT_BREAK_READ + close_server_sock(); /* force accept to wake up */ +#endif + #if defined(__WIN__) { if (!SetEvent(hEventShutdown)) @@ -604,6 +627,7 @@ void kill_mysql(void) #endif DBUG_PRINT("quit",("After pthread_kill")); shutdown_in_progress=1; // Safety if kill didn't work + abort_loop=1; DBUG_VOID_RETURN; } @@ -2023,6 +2047,7 @@ The server will not act as a slave."); sql_print_error("Before Lock_thread_count"); #endif (void) pthread_mutex_lock(&LOCK_thread_count); + DBUG_PRINT("quit", ("Got thread_count mutex")); select_thread_in_use=0; // For close_connections (void) pthread_cond_broadcast(&COND_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count); @@ -2054,10 +2079,14 @@ The server will not act as a slave."); #endif /* HAVE_OPENSSL */ /* Wait until cleanup is done */ (void) pthread_mutex_lock(&LOCK_thread_count); + DBUG_PRINT("quit", ("Got thread_count mutex for clean up wait")); + while (!ready_to_exit) { + DBUG_PRINT("quit", ("not yet ready to exit")); pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); } + DBUG_PRINT("quit", ("ready to exit")); (void) pthread_mutex_unlock(&LOCK_thread_count); my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); exit(0); @@ -2253,6 +2282,20 @@ static void create_new_thread(THD *thd) DBUG_VOID_RETURN; } +#ifdef SIGNALS_DONT_BREAK_READ +inline void kill_broken_server() +{ + /* hack to get around signals ignored in syscalls for problem OS's */ + if (unix_sock == INVALID_SOCKET || ip_sock ==INVALID_SOCKET) + { + select_thread_in_use = 0; + kill_server((void*)MYSQL_KILL_SIGNAL); /* never returns */ + } +} +#define MAYBE_BROKEN_SYSCALL kill_broken_server(); +#else +#define MAYBE_BROKEN_SYSCALL +#endif /* Handle new connections and spawn new process to handle them */ @@ -2288,6 +2331,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) #endif DBUG_PRINT("general",("Waiting for connections.")); + MAYBE_BROKEN_SYSCALL; while (!abort_loop) { readFDs=clientFDs; @@ -2302,12 +2346,15 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) if (!select_errors++ && !abort_loop) /* purecov: inspected */ sql_print_error("mysqld: Got error %d from select",socket_errno); /* purecov: inspected */ } + MAYBE_BROKEN_SYSCALL continue; } #endif /* HPUX */ if (abort_loop) + { + MAYBE_BROKEN_SYSCALL; break; - + } /* ** Is this a new connection request */ @@ -2343,6 +2390,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) if (new_sock != INVALID_SOCKET || (socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN)) break; + MAYBE_BROKEN_SYSCALL; #if !defined(NO_FCNTL_NONBLOCK) if (!(test_flags & TEST_BLOCKING)) { @@ -2359,6 +2407,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) { if ((error_count++ & 255) == 0) // This can happen often sql_perror("Error in accept"); + MAYBE_BROKEN_SYSCALL; if (socket_errno == SOCKET_ENFILE || socket_errno == SOCKET_EMFILE) sleep(1); // Give other threads some time continue; diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index b5300813410..684c084ece3 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -38,6 +38,13 @@ bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif +#ifdef SIGNAL_WITH_VIO_CLOSE +#define KICK_SLAVE { slave_thd->close_active_vio(); \ + thr_alarm_kill(slave_real_id); } +#else +#define KICK_SLAVE thr_alarm_kill(slave_real_id); +#endif + static Slave_log_event* find_slave_event(IO_CACHE* log, const char* log_file_name, char* errmsg); @@ -700,10 +707,7 @@ int stop_slave(THD* thd, bool net_report ) if (slave_running) { abort_slave = 1; - thr_alarm_kill(slave_real_id); -#ifdef SIGNAL_WITH_VIO_CLOSE - slave_thd->close_active_vio(); -#endif + KICK_SLAVE; // do not abort the slave in the middle of a query, so we do not set // thd->killed for the slave thread thd->proc_info = "waiting for slave to die"; @@ -728,7 +732,7 @@ int stop_slave(THD* thd, bool net_report ) #endif pthread_cond_timedwait(&COND_slave_stopped, &LOCK_slave, &abstime); if (slave_running) - thr_alarm_kill(slave_real_id); + KICK_SLAVE; } } else @@ -818,7 +822,7 @@ int change_master(THD* thd) if ((slave_was_running = slave_running)) { abort_slave = 1; - thr_alarm_kill(slave_real_id); + KICK_SLAVE; thd->proc_info = "waiting for slave to die"; while (slave_running) pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done @@ -1470,7 +1474,7 @@ int load_master_data(THD* thd) if ((slave_was_running = slave_running)) { abort_slave = 1; - thr_alarm_kill(slave_real_id); + KICK_SLAVE; thd->proc_info = "waiting for slave to die"; while (slave_running) pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done |