diff options
Diffstat (limited to 'mysys')
-rw-r--r-- | mysys/Makefile.am | 2 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 98 | ||||
-rw-r--r-- | mysys/my_gethostbyname.c | 1 | ||||
-rw-r--r-- | mysys/my_getopt.c | 4 | ||||
-rw-r--r-- | mysys/my_semaphore.c | 103 | ||||
-rw-r--r-- | mysys/my_tempnam.c | 4 | ||||
-rw-r--r-- | mysys/my_winsem.c | 4 | ||||
-rw-r--r-- | mysys/thr_rwlock.c | 93 |
8 files changed, 243 insertions, 66 deletions
diff --git a/mysys/Makefile.am b/mysys/Makefile.am index afaa82a8777..3ca07b7b80b 100644 --- a/mysys/Makefile.am +++ b/mysys/Makefile.am @@ -46,7 +46,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c\ my_quick.c my_lockmem.c my_static.c \ my_getopt.c my_mkdir.c \ default.c my_compress.c checksum.c raid.cc \ - my_net.c \ + my_net.c my_semaphore.c \ my_vsnprintf.c charset.c my_bitmap.c my_bit.c md5.c \ my_gethostbyname.c rijndael.c my_aes.c sha1.c EXTRA_DIST = thr_alarm.c thr_lock.c my_pthread.c my_thr_init.c \ diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 6e0a267165b..54513ba73de 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -68,6 +68,9 @@ static void my_aiowait(my_aio_result *result); #define unlock_append_buffer(info) #endif +#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1)) +#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1)) + static void init_functions(IO_CACHE* info, enum cache_type type) { @@ -424,22 +427,24 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) DBUG_RETURN(0); } - #ifdef THREAD - -/* Initialzie multi-thread usage of the IO cache */ - +/* Prepare IO_CACHE for shared use */ void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) { DBUG_ASSERT(info->type == READ_CACHE); pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST); - pthread_cond_init(&s->cond, 0); - s->count=num_threads; + pthread_cond_init (&s->cond, 0); + s->count=num_threads-1; s->active=0; /* to catch errors */ info->share=s; info->read_function=_my_b_read_r; } +/* + Remove a thread from shared access to IO_CACHE + Every thread should do that on exit for not + to deadlock other threads +*/ void remove_io_thread(IO_CACHE *info) { pthread_mutex_lock(&info->share->mutex); @@ -448,34 +453,41 @@ void remove_io_thread(IO_CACHE *info) pthread_mutex_unlock(&info->share->mutex); } - -int lock_io_cache(IO_CACHE *info) +static int lock_io_cache(IO_CACHE *info) { pthread_mutex_lock(&info->share->mutex); if (!info->share->count) return 1; - info->share->count--; - pthread_cond_wait(&((info)->share->cond), &((info)->share->mutex)); - if (!++info->share->count) + + --(info->share->count); + pthread_cond_wait(&info->share->cond, &info->share->mutex); + /* + count can be -1 here, if one thread was removed (remove_io_thread) + while all others were locked (lock_io_cache). + If this is the case, this thread behaves as if count was 0 from the + very beginning, that is returns 1 and does not unlock the mutex. + */ + if (++(info->share->count)) + return pthread_mutex_unlock(&info->share->mutex); + else return 1; - pthread_mutex_unlock(&info->share->mutex); - return 0; } -void unlock_io_cache(IO_CACHE *info) +static void unlock_io_cache(IO_CACHE *info) { pthread_cond_broadcast(&info->share->cond); pthread_mutex_unlock(&info->share->mutex); } /* - Read from the io cache in a thread safe manner + Read from IO_CACHE when it is shared between several threads. + It works as follows: when a thread tries to read from a file + (that is, after using all the data from the (shared) buffer), + it just hangs on lock_io_cache(), wating for other threads. + When the very last thread attempts a read, lock_io_cache() + returns 1, the thread does actual IO and unlock_io_cache(), + which signals all the waiting threads that data is in the buffer. */ - -#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1)) -#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1)) - - int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) { my_off_t pos_in_file; @@ -491,24 +503,39 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) } while (Count) { - uint cnt, len; + int cnt, len; pos_in_file= info->pos_in_file + (uint)(info->read_end - info->buffer); diff_length= (uint) (pos_in_file & (IO_SIZE-1)); length=IO_ROUND_UP(Count+diff_length)-diff_length; - length= ((length <= info->read_length) ? - length + IO_ROUND_DN(info->read_length - length) : - length - IO_ROUND_UP(length - info->read_length)) ; + length=(length <= info->read_length) ? + length + IO_ROUND_DN(info->read_length - length) : + length - IO_ROUND_UP(length - info->read_length) ; + if (info->type != READ_FIFO && (length > info->end_of_file - pos_in_file)) + length=info->end_of_file - pos_in_file; + if (length == 0) + { + info->error=(int) read_len; + DBUG_RETURN(1); + } if (lock_io_cache(info)) { +#if 0 && SAFE_MUTEX +#define PRINT_LOCK(M) printf("Thread %d: mutex is %s\n", my_thread_id(), \ + (((safe_mutex_t *)(M))->count ? "Locked" : "Unlocked")) +#else +#define PRINT_LOCK(M) +#endif + PRINT_LOCK(&info->share->mutex); info->share->active=info; if (info->seek_not_done) /* File touched, do seek */ VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); - len=my_read(info->file,info->buffer, length, info->myflags); - info->read_end=info->buffer + (len == (uint) -1 ? 0 : len); - info->error=(len == length ? 0 : len); + len=(int)my_read(info->file,info->buffer, length, info->myflags); + info->read_end=info->buffer + (len == -1 ? 0 : len); + info->error=(len == (int)length ? 0 : len); info->pos_in_file=pos_in_file; unlock_io_cache(info); + PRINT_LOCK(&info->share->mutex); } else { @@ -516,15 +543,16 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) info->read_end= info->share->active->read_end; info->pos_in_file= info->share->active->pos_in_file; len= (info->error == -1 ? -1 : info->read_end-info->buffer); + PRINT_LOCK(&info->share->mutex); } info->read_pos=info->buffer; info->seek_not_done=0; - if (info->error) + if (len <= 0) { info->error=(int) read_len; DBUG_RETURN(1); } - cnt=(len > Count) ? Count : len; + cnt=(len > Count) ? (int) Count : len; memcpy(Buffer,info->read_pos, (size_t)cnt); Count -=cnt; Buffer+=cnt; @@ -1098,11 +1126,19 @@ int end_io_cache(IO_CACHE *info) DBUG_ENTER("end_io_cache"); #ifdef THREAD - /* simple protection against multi-close: destroying share first */ if (info->share) { - pthread_cond_destroy(&info->share->cond); +#ifdef SAFE_MUTEX + /* simple protection against multi-close: destroying share first */ + if (pthread_cond_destroy (&info->share->cond) | + pthread_mutex_destroy(&info->share->mutex)) + { + DBUG_RETURN(1); + } +#else + pthread_cond_destroy (&info->share->cond); pthread_mutex_destroy(&info->share->mutex); +#endif info->share=0; } #endif diff --git a/mysys/my_gethostbyname.c b/mysys/my_gethostbyname.c index 1380257f660..f21a880350b 100644 --- a/mysys/my_gethostbyname.c +++ b/mysys/my_gethostbyname.c @@ -56,7 +56,6 @@ struct hostent *my_gethostbyname_r(const char *name, struct hostent *result, char *buffer, int buflen, int *h_errnop) { - DBUG_ASSERT(buflen >= sizeof(struct hostent_data)); if (gethostbyname_r(name,result,(struct hostent_data *) buffer) == -1) { *h_errnop= errno; diff --git a/mysys/my_getopt.c b/mysys/my_getopt.c index 9674359ac4e..753b1990218 100644 --- a/mysys/my_getopt.c +++ b/mysys/my_getopt.c @@ -75,7 +75,7 @@ int handle_options(int *argc, char ***argv, uint opt_found, argvpos= 0, length, i; my_bool end_of_options= 0, must_be_var, set_maximum_value, special_used, option_is_loose; - char *progname= *(*argv), **pos, *optend, *prev_found; + char *progname= *(*argv), **pos, **pos_end, *optend, *prev_found; const struct my_option *optp; int error; @@ -84,7 +84,7 @@ int handle_options(int *argc, char ***argv, (*argv)++; /* --- || ---- */ init_variables(longopts); - for (pos= *argv; *pos; pos++) + for (pos= *argv, pos_end=pos+ *argc; pos != pos_end ; pos++) { char *cur_arg= *pos; if (cur_arg[0] == '-' && cur_arg[1] && !end_of_options) /* must be opt */ diff --git a/mysys/my_semaphore.c b/mysys/my_semaphore.c new file mode 100644 index 00000000000..de4aac1cdbd --- /dev/null +++ b/mysys/my_semaphore.c @@ -0,0 +1,103 @@ +/* Copyright (C) 2002 MySQL AB & MySQL Finland AB & TCX DataKonsult AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* + Simple implementation of semaphores, needed to compile MySQL on systems + that doesn't support semaphores. +*/ + +#include <my_global.h> +#include <my_semaphore.h> + +#if !defined(__WIN__) && !defined(HAVE_SEMAPHORE_H) + +int sem_init(sem_t * sem, int pshared, uint value) +{ + sem->count=value; + pthread_cond_init(&sem->cond, 0); + pthread_mutex_init(&sem->mutex, 0); + return 0; +} + +int sem_destroy(sem_t * sem) +{ + int err1,err2; + err1=pthread_cond_destroy(&sem->cond); + err2=pthread_mutex_destroy(&sem->mutex); + if (err1 || err2) + { + errno=err1 ? err1 : err2; + return -1; + } + return 0; +} + +int sem_wait(sem_t * sem) +{ + if ((errno=pthread_mutex_lock(&sem->mutex))) + return -1; + while (!sem->count) + pthread_cond_wait(&sem->cond, &sem->mutex); + if (errno) + return -1; + sem->count--; /* mutex is locked here */ + pthread_mutex_unlock(&sem->mutex); + return 0; +} + +int sem_trywait(sem_t * sem) +{ + if ((errno=pthread_mutex_lock(&sem->mutex))) + return -1; + if (sem->count) + sem->count--; + else + errno=EAGAIN; + pthread_mutex_unlock(&sem->mutex); + return errno ? -1 : 0; +} + + +int sem_post(sem_t * sem) +{ + if ((errno=pthread_mutex_lock(&sem->mutex))) + return -1; + sem->count++; + pthread_mutex_unlock(&sem->mutex); /* does it really matter what to do */ + pthread_cond_signal(&sem->cond); /* first: x_unlock or x_signal ? */ + return 0; +} + +int sem_post_multiple(sem_t * sem, uint count) +{ + if ((errno=pthread_mutex_lock(&sem->mutex))) + return -1; + sem->count+=count; + pthread_mutex_unlock(&sem->mutex); /* does it really matter what to do */ + pthread_cond_broadcast(&sem->cond); /* first: x_unlock or x_broadcast ? */ + return 0; +} + +int sem_getvalue(sem_t * sem, uint *sval) +{ + if ((errno=pthread_mutex_lock(&sem->mutex))) + return -1; + *sval=sem->count; + pthread_mutex_unlock(&sem->mutex); + return 0; +} + +#endif /* !defined(__WIN__) && !defined(HAVE_SEMAPHORE_H) */ diff --git a/mysys/my_tempnam.c b/mysys/my_tempnam.c index da0692b46c5..6c17aa5b165 100644 --- a/mysys/my_tempnam.c +++ b/mysys/my_tempnam.c @@ -109,13 +109,13 @@ my_string my_tempnam(const char *dir, const char *pfx, old_env=(char**)environ; if (dir) { /* Don't use TMPDIR if dir is given */ - environ=(const char**)temp_env; /* May give warning */ + ((char**) environ)=(char**) temp_env; temp_env[0]=0; } #endif res=tempnam((char*) dir,(my_string) pfx); /* Use stand. dir with prefix */ #ifndef OS2 - environ=(const char**)old_env; /* May give warning */ + ((char**) environ)=(char**) old_env; #endif if (!res) DBUG_PRINT("error",("Got error: %d from tempnam",errno)); diff --git a/mysys/my_winsem.c b/mysys/my_winsem.c index 268a05a0b21..e2713d189b2 100644 --- a/mysys/my_winsem.c +++ b/mysys/my_winsem.c @@ -375,7 +375,7 @@ sem_post (sem_t * sem) */ int -sem_post_multiple (sem_t * sem, int count ) +sem_post_multiple (sem_t * sem, unsigned int count) { #ifdef EXTRA_DEBUG if (sem == NULL || *sem == NULL || count <= 0) @@ -397,7 +397,7 @@ sem_post_multiple (sem_t * sem, int count ) } int -sem_getvalue (sem_t *sem, int *sval) +sem_getvalue (sem_t *sem, unsigned int *sval) { errno = ENOSYS; return -1; diff --git a/mysys/thr_rwlock.c b/mysys/thr_rwlock.c index 4e2cf3554b7..f1f70b5c4ac 100644 --- a/mysys/thr_rwlock.c +++ b/mysys/thr_rwlock.c @@ -19,11 +19,13 @@ #include "mysys_priv.h" #include <my_pthread.h> #if defined(THREAD) && !defined(HAVE_PTHREAD_RWLOCK_RDLOCK) && !defined(HAVE_RWLOCK_INIT) +#include <errno.h> /* - * Source base from Sun Microsystems SPILT, simplified - * for MySQL use -- Joshua Chamas - */ + Source base from Sun Microsystems SPILT, simplified for MySQL use + -- Joshua Chamas + Some cleanup and additional code by Monty +*/ /* * Multithreaded Demo Source @@ -58,7 +60,7 @@ * Mountain View, California 94043 */ -int my_rwlock_init( rw_lock_t *rwp, void *arg __attribute__((unused))) +int my_rwlock_init(rw_lock_t *rwp, void *arg __attribute__((unused))) { pthread_condattr_t cond_attr; @@ -71,64 +73,101 @@ int my_rwlock_init( rw_lock_t *rwp, void *arg __attribute__((unused))) rwp->state = 0; rwp->waiters = 0; - return( 0 ); + return(0); } -int my_rwlock_destroy( rw_lock_t *rwp ) { + +int my_rwlock_destroy(rw_lock_t *rwp) +{ pthread_mutex_destroy( &rwp->lock ); pthread_cond_destroy( &rwp->readers ); pthread_cond_destroy( &rwp->writers ); - - return( 0 ); + return(0); } -int my_rw_rdlock( rw_lock_t *rwp ) { + +int my_rw_rdlock(rw_lock_t *rwp) +{ pthread_mutex_lock(&rwp->lock); - /* active or queued writers */ - while ( ( rwp->state < 0 ) || rwp->waiters ) + /* active or queued writers */ + while (( rwp->state < 0 ) || rwp->waiters) pthread_cond_wait( &rwp->readers, &rwp->lock); rwp->state++; pthread_mutex_unlock(&rwp->lock); + return(0); +} - return( 0 ); +int my_rw_tryrdlock(rw_lock_t *rwp) +{ + int res; + pthread_mutex_lock(&rwp->lock); + if ((rwp->state < 0 ) || rwp->waiters) + res= EBUSY; /* Can't get lock */ + else + { + res=0; + rwp->state++; + } + pthread_mutex_unlock(&rwp->lock); + return(res); } -int my_rw_wrlock( rw_lock_t *rwp ) { +int my_rw_wrlock(rw_lock_t *rwp) +{ pthread_mutex_lock(&rwp->lock); - rwp->waiters++; /* another writer queued */ + rwp->waiters++; /* another writer queued */ - while ( rwp->state ) - pthread_cond_wait( &rwp->writers, &rwp->lock); + while (rwp->state) + pthread_cond_wait(&rwp->writers, &rwp->lock); rwp->state = -1; - --rwp->waiters; - pthread_mutex_unlock( &rwp->lock ); + rwp->waiters--; + pthread_mutex_unlock(&rwp->lock); + return(0); +} - return( 0 ); + +int my_rw_trywrlock(rw_lock_t *rwp) +{ + int res; + pthread_mutex_lock(&rwp->lock); + if (rwp->state) + res= EBUSY; /* Can't get lock */ + else + { + res=0; + rwp->state = -1; + } + pthread_mutex_unlock(&rwp->lock); + return(res); } -int my_rw_unlock( rw_lock_t *rwp ) { + +int my_rw_unlock(rw_lock_t *rwp) +{ DBUG_PRINT("rw_unlock", ("state: %d waiters: %d", rwp->state, rwp->waiters)); pthread_mutex_lock(&rwp->lock); - if ( rwp->state == -1 ) { /* writer releasing */ - rwp->state = 0; /* mark as available */ + if (rwp->state == -1) /* writer releasing */ + { + rwp->state= 0; /* mark as available */ - if ( rwp->waiters ) /* writers queued */ + if ( rwp->waiters ) /* writers queued */ pthread_cond_signal( &rwp->writers ); else pthread_cond_broadcast( &rwp->readers ); - } else { - if ( --rwp->state == 0 ) /* no more readers */ + } + else + { + if ( --rwp->state == 0 ) /* no more readers */ pthread_cond_signal( &rwp->writers ); } pthread_mutex_unlock( &rwp->lock ); - - return( 0 ); + return(0); } #endif |