diff options
author | Sergei Golubchik <serg@mysql.com> | 2008-07-29 16:10:24 +0200 |
---|---|---|
committer | Sergei Golubchik <serg@mysql.com> | 2008-07-29 16:10:24 +0200 |
commit | 6ba12f070c65a445ba3f6758c1a49a872c627561 (patch) | |
tree | 8fc9687df3d7995af94f6a6df09bc646da26592e | |
parent | 96e2ca52adfc4e58e4a08d20dcb32a6ff2f1ab2c (diff) | |
download | mariadb-git-6ba12f070c65a445ba3f6758c1a49a872c627561.tar.gz |
WL#3064 - waiting threads - wait-for graph and deadlock detection
client/mysqltest.c:
compiler warnings
configure.in:
remove old tests for unused programs
disable the use of gcc built-ins if smp assembler atomics were selected explictily.
add waiting_threads.o to THREAD_LOBJECTS
include/lf.h:
replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
scheduler.
constructor/destructor in lf-alloc
include/my_pthread.h:
shuffle set_timespec/set_timespec_nsec macros a bit to be able to fill
several timeout structures with only one my_getsystime() call
include/waiting_threads.h:
waiting threads - wait-for graph and deadlock detection
mysys/Makefile.am:
add waiting_threads.c
mysys/lf_alloc-pin.c:
replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
scheduler.
constructor/destructor in lf-alloc
mysys/lf_hash.c:
constructor/destructor in lf-alloc
mysys/my_thr_init.c:
remember end-of-stack pointer in the mysys_var
mysys/waiting_threads.c:
waiting threads - wait-for graph and deadlock detection
storage/maria/ha_maria.cc:
replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
scheduler.
storage/maria/ma_commit.c:
replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
scheduler.
storage/maria/trnman.c:
replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
scheduler.
storage/maria/trnman_public.h:
replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
scheduler.
storage/maria/unittest/trnman-t.c:
replace the end-of-stack pointer with the pointer to the end-of-stack pointer.
the latter could be stored in THD (mysys_vars) and updated in pool-of-threads
scheduler.
unittest/mysys/Makefile.am:
add waiting_threads-t
unittest/mysys/lf-t.c:
factor out the common code for multi-threaded stress unit tests
move lf tests to a separate file
unittest/mysys/my_atomic-t.c:
factor out the common code for multi-threaded stress unit tests
move lf tests to a separate file
unittest/mysys/thr_template.c:
factor out the common code for multi-threaded stress unit tests
unittest/mysys/waiting_threads-t.c:
wt tests
-rw-r--r-- | client/mysqltest.c | 4 | ||||
-rw-r--r-- | configure.in | 58 | ||||
-rw-r--r-- | include/lf.h | 28 | ||||
-rw-r--r-- | include/my_pthread.h | 80 | ||||
-rw-r--r-- | include/waiting_threads.h | 154 | ||||
-rw-r--r-- | mysys/Makefile.am | 2 | ||||
-rw-r--r-- | mysys/lf_alloc-pin.c | 52 | ||||
-rw-r--r-- | mysys/lf_hash.c | 15 | ||||
-rw-r--r-- | mysys/my_thr_init.c | 8 | ||||
-rw-r--r-- | mysys/waiting_threads.c | 641 | ||||
-rw-r--r-- | storage/maria/ha_maria.cc | 10 | ||||
-rw-r--r-- | storage/maria/ma_commit.c | 4 | ||||
-rw-r--r-- | storage/maria/trnman.c | 7 | ||||
-rw-r--r-- | storage/maria/trnman_public.h | 2 | ||||
-rw-r--r-- | storage/maria/unittest/trnman-t.c | 9 | ||||
-rw-r--r-- | unittest/mysys/Makefile.am | 2 | ||||
-rw-r--r-- | unittest/mysys/lf-t.c | 168 | ||||
-rw-r--r-- | unittest/mysys/my_atomic-t.c | 256 | ||||
-rw-r--r-- | unittest/mysys/thr_template.c | 92 | ||||
-rw-r--r-- | unittest/mysys/waiting_threads-t.c | 278 |
20 files changed, 1503 insertions, 367 deletions
diff --git a/client/mysqltest.c b/client/mysqltest.c index 50f72377122..19c6d2eb71d 100644 --- a/client/mysqltest.c +++ b/client/mysqltest.c @@ -2815,7 +2815,7 @@ void do_mkdir(struct st_command *command) int error; static DYNAMIC_STRING ds_dirname; const struct command_arg mkdir_args[] = { - "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to create" + {"dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to create"} }; DBUG_ENTER("do_mkdir"); @@ -2845,7 +2845,7 @@ void do_rmdir(struct st_command *command) int error; static DYNAMIC_STRING ds_dirname; const struct command_arg rmdir_args[] = { - "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to remove" + { "dirname", ARG_STRING, TRUE, &ds_dirname, "Directory to remove" } }; DBUG_ENTER("do_rmdir"); diff --git a/configure.in b/configure.in index 4e74c708181..ffc77b29434 100644 --- a/configure.in +++ b/configure.in @@ -250,8 +250,6 @@ test -z "$INSTALL_SCRIPT" && INSTALL_SCRIPT='${INSTALL_PROGRAM}' # Not critical since the generated file is distributed AC_CHECK_PROGS(YACC, ['bison -y -p MYSQL']) -AC_CHECK_PROG(PDFMANUAL, pdftex, manual.pdf) -AC_CHECK_PROG(DVIS, tex, manual.dvi) #check the return type of sprintf AC_MSG_CHECKING("return type of sprintf") @@ -1726,41 +1724,43 @@ fi AC_ARG_WITH([atomic-ops], AC_HELP_STRING([--with-atomic-ops=rwlocks|smp|up], [Implement atomic operations using pthread rwlocks or atomic CPU - instructions for multi-processor (default) or uniprocessor - configuration]), , [with_atomic_ops=smp]) + instructions for multi-processor or uniprocessor + configuration. By default gcc built-in sync functions are used, + if available and 'smp' configuration otherwise.])) case "$with_atomic_ops" in "up") AC_DEFINE([MY_ATOMIC_MODE_DUMMY], [1], [Assume single-CPU mode, no concurrency]) ;; "rwlocks") AC_DEFINE([MY_ATOMIC_MODE_RWLOCKS], [1], [Use pthread rwlocks for atomic ops]) ;; "smp") ;; + "") + AC_CACHE_CHECK([whether the compiler provides atomic builtins], + [mysql_cv_gcc_atomic_builtins], [AC_TRY_RUN([ + int main() + { + int foo= -10; int bar= 10; + if (!__sync_fetch_and_add(&foo, bar) || foo) + return -1; + bar= __sync_lock_test_and_set(&foo, bar); + if (bar || foo != 10) + return -1; + bar= __sync_val_compare_and_swap(&bar, foo, 15); + if (bar) + return -1; + return 0; + } + ], [mysql_cv_gcc_atomic_builtins=yes_but_disabled], + [mysql_cv_gcc_atomic_builtins=no], + [mysql_cv_gcc_atomic_builtins=no])]) + + if test "x$mysql_cv_gcc_atomic_builtins" = xyes; then + AC_DEFINE(HAVE_GCC_ATOMIC_BUILTINS, 1, + [Define to 1 if compiler provides atomic builtins.]) + fi + ;; *) AC_MSG_ERROR(["$with_atomic_ops" is not a valid value for --with-atomic-ops]) ;; esac -AC_CACHE_CHECK([whether the compiler provides atomic builtins], - [mysql_cv_gcc_atomic_builtins], [AC_TRY_RUN([ - int main() - { - int foo= -10; int bar= 10; - if (!__sync_fetch_and_add(&foo, bar) || foo) - return -1; - bar= __sync_lock_test_and_set(&foo, bar); - if (bar || foo != 10) - return -1; - bar= __sync_val_compare_and_swap(&bar, foo, 15); - if (bar) - return -1; - return 0; - } -], [mysql_cv_gcc_atomic_builtins=yes], - [mysql_cv_gcc_atomic_builtins=no], - [mysql_cv_gcc_atomic_builtins=no])]) - -if test "x$mysql_cv_gcc_atomic_builtins" = disabled_xyes; then - AC_DEFINE(HAVE_GCC_ATOMIC_BUILTINS, 1, - [Define to 1 if compiler provides atomic builtins.]) -fi - # Force static compilation to avoid linking problems/get more speed AC_ARG_WITH(mysqld-ldflags, [ --with-mysqld-ldflags Extra linking arguments for mysqld], @@ -2702,7 +2702,7 @@ then AC_DEFINE([THREAD], [1], [Define if you want to have threaded code. This may be undef on client code]) # Avoid _PROGRAMS names - THREAD_LOBJECTS="thr_alarm.o thr_lock.o thr_mutex.o thr_rwlock.o my_pthread.o my_thr_init.o mf_keycache.o" + THREAD_LOBJECTS="thr_alarm.o thr_lock.o thr_mutex.o thr_rwlock.o my_pthread.o my_thr_init.o mf_keycache.o waiting_threads.o" AC_SUBST(THREAD_LOBJECTS) server_scripts="mysqld_safe mysql_install_db" sql_server_dirs="strings mysys dbug extra regex" diff --git a/include/lf.h b/include/lf.h index d4c5c64a01b..0976aa4927f 100644 --- a/include/lf.h +++ b/include/lf.h @@ -110,7 +110,7 @@ typedef struct { typedef struct { void * volatile pin[LF_PINBOX_PINS]; LF_PINBOX *pinbox; - void *stack_ends_here; + void **stack_ends_here; void *purgatory; uint32 purgatory_count; uint32 volatile link; @@ -166,8 +166,8 @@ void lf_pinbox_init(LF_PINBOX *pinbox, uint free_ptr_offset, void lf_pinbox_destroy(LF_PINBOX *pinbox); lock_wrap(lf_pinbox_get_pins, LF_PINS *, - (LF_PINBOX *pinbox, void *stack_end), - (pinbox, stack_end), + (LF_PINBOX *pinbox), + (pinbox), &pinbox->pinarray.lock) lock_wrap_void(lf_pinbox_put_pins, (LF_PINS *pins), @@ -182,15 +182,13 @@ lock_wrap_void(lf_pinbox_free, memory allocator, lf_alloc-pin.c */ -struct st_lf_alloc_node { - struct st_lf_alloc_node *next; -}; - typedef struct st_lf_allocator { LF_PINBOX pinbox; - struct st_lf_alloc_node * volatile top; + uchar * volatile top; uint element_size; uint32 volatile mallocs; + void (*constructor)(uchar *); + void (*destructor)(uchar *); } LF_ALLOCATOR; void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset); @@ -202,8 +200,8 @@ uint lf_alloc_pool_count(LF_ALLOCATOR *allocator); */ #define _lf_alloc_free(PINS, PTR) _lf_pinbox_free((PINS), (PTR)) #define lf_alloc_free(PINS, PTR) lf_pinbox_free((PINS), (PTR)) -#define _lf_alloc_get_pins(A, ST) _lf_pinbox_get_pins(&(A)->pinbox, (ST)) -#define lf_alloc_get_pins(A, ST) lf_pinbox_get_pins(&(A)->pinbox, (ST)) +#define _lf_alloc_get_pins(A) _lf_pinbox_get_pins(&(A)->pinbox) +#define lf_alloc_get_pins(A) lf_pinbox_get_pins(&(A)->pinbox) #define _lf_alloc_put_pins(PINS) _lf_pinbox_put_pins(PINS) #define lf_alloc_put_pins(PINS) lf_pinbox_put_pins(PINS) #define lf_alloc_direct_free(ALLOC, ADDR) my_free((uchar*)(ADDR), MYF(0)) @@ -220,13 +218,17 @@ lock_wrap(lf_alloc_new, void *, #define LF_HASH_UNIQUE 1 +/* lf_hash overhead per element (that is, sizeof(LF_SLIST) */ +#define LF_HASH_OVERHEAD (sizeof(int*)*4) + typedef struct { LF_DYNARRAY array; /* hash itself */ LF_ALLOCATOR alloc; /* allocator for elements */ hash_get_key get_key; /* see HASH */ CHARSET_INFO *charset; /* see HASH */ uint key_offset, key_length; /* see HASH */ - uint element_size, flags; /* LF_HASH_UNIQUE, etc */ + uint element_size; /* size of memcpy'ed area on insert */ + uint flags; /* LF_HASH_UNIQUE, etc */ int32 volatile size; /* size of array */ int32 volatile count; /* number of elements in the hash */ } LF_HASH; @@ -242,8 +244,8 @@ int lf_hash_delete(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen); shortcut macros to access underlying pinbox functions from an LF_HASH see _lf_pinbox_get_pins() and _lf_pinbox_put_pins() */ -#define _lf_hash_get_pins(HASH, ST) _lf_alloc_get_pins(&(HASH)->alloc, (ST)) -#define lf_hash_get_pins(HASH, ST) lf_alloc_get_pins(&(HASH)->alloc, (ST)) +#define _lf_hash_get_pins(HASH) _lf_alloc_get_pins(&(HASH)->alloc) +#define lf_hash_get_pins(HASH) lf_alloc_get_pins(&(HASH)->alloc) #define _lf_hash_put_pins(PINS) _lf_pinbox_put_pins(PINS) #define lf_hash_put_pins(PINS) lf_pinbox_put_pins(PINS) #define lf_hash_search_unpin(PINS) lf_unpin((PINS), 2) diff --git a/include/my_pthread.h b/include/my_pthread.h index f1ae75b516e..28ce083d744 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -79,25 +79,27 @@ typedef void * (__cdecl *pthread_handler)(void *); so it can be used directly as a 64 bit value. The value stored is in 100ns units. */ - union ft64 { +union ft64 { FILETIME ft; __int64 i64; - }; +}; + struct timespec { union ft64 tv; /* The max timeout value in millisecond for pthread_cond_timedwait */ long max_timeout_msec; }; -#define set_timespec(ABSTIME,SEC) { \ - GetSystemTimeAsFileTime(&((ABSTIME).tv.ft)); \ - (ABSTIME).tv.i64+= (__int64)(SEC)*10000000; \ - (ABSTIME).max_timeout_msec= (long)((SEC)*1000); \ -} -#define set_timespec_nsec(ABSTIME,NSEC) { \ - GetSystemTimeAsFileTime(&((ABSTIME).tv.ft)); \ - (ABSTIME).tv.i64+= (__int64)(NSEC)/100; \ - (ABSTIME).max_timeout_msec= (long)((NSEC)/1000000); \ -} + +#define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \ + (ABSTIME).tv.i64= (TIME)+(__int64)(NSEC)/100; \ + (ABSTIME).max_timeout_msec= (long)((NSEC)/1000000); \ +} while(0) + +#define set_timespec_nsec(ABSTIME,NSEC) do { \ + union ft64 tv; \ + GetSystemTimeAsFileTime(&tv.ft); \ + set_timespec_time_nsec((ABSTIME), tv.i64, (NSEC)) \ +} while(0) void win_pthread_init(void); int win_pthread_setspecific(void *A,void *B,uint length); @@ -416,43 +418,32 @@ int my_pthread_mutex_trylock(pthread_mutex_t *mutex); for calculating an absolute time at which pthread_cond_timedwait should timeout */ -#ifdef HAVE_TIMESPEC_TS_SEC -#ifndef set_timespec -#define set_timespec(ABSTIME,SEC) \ -{ \ - (ABSTIME).ts_sec=time(0) + (time_t) (SEC); \ - (ABSTIME).ts_nsec=0; \ -} -#endif /* !set_timespec */ + +#define set_timespec(ABSTIME,SEC) set_timespec_nsec((ABSTIME),(SEC)*1000000000ULL) + #ifndef set_timespec_nsec -#define set_timespec_nsec(ABSTIME,NSEC) \ -{ \ - ulonglong now= my_getsystime() + (NSEC/100); \ - (ABSTIME).ts_sec= (now / ULL(10000000)); \ - (ABSTIME).ts_nsec= (now % ULL(10000000) * 100 + ((NSEC) % 100)); \ -} +#define set_timespec_nsec(ABSTIME,NSEC) \ + set_timespec_time_nsec((ABSTIME),my_getsystime(),(NSEC)) #endif /* !set_timespec_nsec */ + +/* adapt for two different flavors of struct timespec */ +#ifdef HAVE_TIMESPEC_TS_SEC +#define TV_sec ts_sec +#define TV_nsec ts_nsec #else -#ifndef set_timespec -#define set_timespec(ABSTIME,SEC) \ -{\ - struct timeval tv;\ - gettimeofday(&tv,0);\ - (ABSTIME).tv_sec=tv.tv_sec+(time_t) (SEC);\ - (ABSTIME).tv_nsec=tv.tv_usec*1000;\ -} -#endif /* !set_timespec */ -#ifndef set_timespec_nsec -#define set_timespec_nsec(ABSTIME,NSEC) \ -{\ - ulonglong now= my_getsystime() + (NSEC/100); \ - (ABSTIME).tv_sec= (time_t) (now / ULL(10000000)); \ - (ABSTIME).tv_nsec= (long) (now % ULL(10000000) * 100 + ((NSEC) % 100)); \ -} -#endif /* !set_timespec_nsec */ +#define TV_sec tv_sec +#define TV_nsec tv_nsec #endif /* HAVE_TIMESPEC_TS_SEC */ - /* safe_mutex adds checking to mutex for easier debugging */ +#ifndef set_timespec_time_nsec +#define set_timespec_time_nsec(ABSTIME,TIME,NSEC) do { \ + ulonglong now= (TIME) + (NSEC/100); \ + (ABSTIME).TV_sec= (now / ULL(10000000)); \ + (ABSTIME).TV_nsec= (now % ULL(10000000) * 100 + ((NSEC) % 100)); \ +} while(0) +#endif /* !set_timespec_time_nsec */ + +/* safe_mutex adds checking to mutex for easier debugging */ #if defined(__NETWARE__) && !defined(SAFE_MUTEX_DETECT_DESTROY) #define SAFE_MUTEX_DETECT_DESTROY @@ -692,6 +683,7 @@ struct st_my_thread_var struct st_my_thread_var *next,**prev; void *opt_info; uint lock_type; /* used by conditional release the queue */ + void *stack_ends_here; #ifndef DBUG_OFF void *dbug; char name[THREAD_NAME_SIZE+1]; diff --git a/include/waiting_threads.h b/include/waiting_threads.h new file mode 100644 index 00000000000..92fbbf998be --- /dev/null +++ b/include/waiting_threads.h @@ -0,0 +1,154 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <my_global.h> +#include <my_sys.h> +#include <lf.h> + +typedef struct st_wt_resource_id WT_RESOURCE_ID; + +typedef struct st_wt_resource_type { + int (*compare)(void *a, void *b); + const void *(*make_key)(WT_RESOURCE_ID *id, uint *len); +} WT_RESOURCE_TYPE; + +struct st_wt_resource_id { + WT_RESOURCE_TYPE *type; + union { + void *ptr; + ulonglong num; + } value; +}; + +extern uint wt_timeout_short, wt_deadlock_search_depth_short; +extern uint wt_timeout_long, wt_deadlock_search_depth_long; + +#define WT_WAIT_STATS 24 +#define WT_CYCLE_STATS 32 +extern ulonglong wt_wait_table[WT_WAIT_STATS]; +extern uint32 wt_wait_stats[WT_WAIT_STATS+1]; +extern uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1]; +extern uint32 wt_success_stats; + +/* + 'lock' protects 'owners', 'state', and 'waiter_count' + 'id' is read-only + + a resource is picked up from a hash in a lock-free manner + it's returned pinned, so it cannot be freed at once + but it may be freed right after the pin is removed + to free a resource it should be + 1. have no owners + 2. have no waiters + + two ways to access a resource: + 1. find it in a hash + - it's returned pinned. + a) take a lock in exclusive mode + b) check the state, it should be ACTIVE + c) unpin + 2. by a direct reference + - could only used if a resource cannot be freed + e.g. accessing a resource by thd->waiting_for is safe, + a resource cannot be freed as there's a thread waiting for it +*/ + +typedef struct st_wt_resource { + WT_RESOURCE_ID id; + uint waiter_count; + enum { ACTIVE, FREE } state; +#ifndef DBUG_OFF + pthread_mutex_t *mutex; +#endif + /* + before the 'lock' all elements are mutable, after - immutable + in the sense that lf_hash_insert() won't memcpy() over them. + See wt_init(). + */ + pthread_rwlock_t lock; + pthread_cond_t cond; + DYNAMIC_ARRAY owners; +} WT_RESOURCE; + +typedef struct st_wt_thd { + /* + XXX + there's no protection (mutex) against concurrent access of + the dynarray below. it is assumed that a caller will have it + automatically (not to protect this array but to protect its + own - caller's - data structures, and we'll get it for free. + If not, we'll need to add a mutex + */ + DYNAMIC_ARRAY my_resources; + /* + 'waiting_for' is modified under waiting_for->lock, and only by thd itself + 'waiting_for' is read lock-free (using pinning protocol), but a thd object + can read its own 'waiting_for' without any locks or tricks. + */ + WT_RESOURCE *waiting_for; + LF_PINS *pins; + /* + weight relates to the desirability of a transaction being killed if it's + part of a deadlock. In a deadlock situation transactions with lower weights + are killed first. + + Examples of using the weight to implement different selection strategies: + + 1. Latest + Keep all weights equal. + 2. Random + Assight weights at random. + (variant: modify a weight randomly before every lock request) + 3. Youngest + Set weight to -NOW() + 4. Minimum locks + count locks granted in your lock manager, store the value as a weight + 5. Minimum work + depends on the definition of "work". For example, store the number + of rows modifies in this transaction (or a length of REDO log for a + transaction) as a weight. + + It is only statistically relevant and is not protected by any locks. + */ + ulong volatile weight; + /* + 'killed' is indirectly protected by waiting_for->lock - + a killed thread needs to clear its 'waiting_for', and thus needs a lock. + That is a thread needs an exclusive lock to read 'killed' reliably. + But other threads may change 'killed' from 0 to 1, a shared + lock is enough for that. + */ + my_bool volatile killed; +#ifndef DBUG_OFF + const char *name; +#endif +} WT_THD; + +#define WT_TIMEOUT ETIMEDOUT +#define WT_OK 0 +#define WT_DEADLOCK -1 +#define WT_DEPTH_EXCEEDED -2 + +void wt_init(void); +void wt_end(void); +void wt_thd_init(WT_THD *); +void wt_thd_destroy(WT_THD *); +int wt_thd_will_wait_for(WT_THD *, WT_THD *, WT_RESOURCE_ID *); +int wt_thd_dontwait(WT_THD *); +int wt_thd_cond_timedwait(WT_THD *, pthread_mutex_t *); +void wt_thd_release(WT_THD *, WT_RESOURCE_ID *); +#define wt_thd_release_all(THD) wt_thd_release((THD), 0) +int wt_resource_id_memcmp(void *, void *); + diff --git a/mysys/Makefile.am b/mysys/Makefile.am index 7bb98770d06..54553680341 100644 --- a/mysys/Makefile.am +++ b/mysys/Makefile.am @@ -58,7 +58,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \ my_windac.c my_access.c base64.c my_libwrap.c \ wqueue.c EXTRA_DIST = thr_alarm.c thr_lock.c my_pthread.c my_thr_init.c \ - thr_mutex.c thr_rwlock.c \ + thr_mutex.c thr_rwlock.c waiting_threads.c \ CMakeLists.txt mf_soundex.c \ my_conio.c my_wincond.c my_winthread.c libmysys_a_LIBADD = @THREAD_LOBJECTS@ diff --git a/mysys/lf_alloc-pin.c b/mysys/lf_alloc-pin.c index 40438e93596..4fae8e37ddb 100644 --- a/mysys/lf_alloc-pin.c +++ b/mysys/lf_alloc-pin.c @@ -96,11 +96,10 @@ versioning a pointer - because we use an array, a pointer to pins is 16 bit, upper 16 bits are used for a version. - It is assumed that pins belong to a thread and are not transferable - between threads (LF_PINS::stack_ends_here being a primary reason + It is assumed that pins belong to a THD and are not transferable + between THD's (LF_PINS::stack_ends_here being a primary reason for this limitation). */ - #include <my_global.h> #include <my_sys.h> #include <lf.h> @@ -137,10 +136,6 @@ void lf_pinbox_destroy(LF_PINBOX *pinbox) SYNOPSYS pinbox - - stack_end - a pointer to the end (top/bottom, depending on the - STACK_DIRECTION) of stack. Used for safe alloca. There's - no safety margin deducted, a caller should take care of it, - if necessary. DESCRIPTION get a new LF_PINS structure from a stack of unused pins, @@ -150,7 +145,7 @@ void lf_pinbox_destroy(LF_PINBOX *pinbox) It is assumed that pins belong to a thread and are not transferable between threads. */ -LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end) +LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox) { uint32 pins, next, top_ver; LF_PINS *el; @@ -194,7 +189,7 @@ LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end) el->link= pins; el->purgatory_count= 0; el->pinbox= pinbox; - el->stack_ends_here= stack_end; + el->stack_ends_here= & my_thread_var->stack_ends_here; return el; } @@ -325,6 +320,9 @@ static int match_pins(LF_PINS *el, void *addr) #define available_stack_size(CUR,END) (long) ((char*)(END) - (char*)(CUR)) #endif +#define next_node(P, X) (*((uchar **)(((uchar *)(X)) + (P)->free_ptr_offset))) +#define anext_node(X) next_node(&allocator->pinbox, (X)) + /* Scan the purgatory and free everything that can be freed */ @@ -332,7 +330,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins) { int npins, alloca_size; void *list, **addr; - struct st_lf_alloc_node *first, *last= NULL; + uchar *first, *last= NULL; LF_PINBOX *pinbox= pins->pinbox; LINT_INIT(first); @@ -341,7 +339,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins) #ifdef HAVE_ALLOCA alloca_size= sizeof(void *)*LF_PINBOX_PINS*npins; /* create a sorted list of pinned addresses, to speed up searches */ - if (available_stack_size(&pinbox, pins->stack_ends_here) > alloca_size) + if (available_stack_size(&pinbox, *pins->stack_ends_here) > alloca_size) { struct st_harvester hv; addr= (void **) alloca(alloca_size); @@ -391,9 +389,9 @@ static void _lf_pinbox_real_free(LF_PINS *pins) } /* not pinned - freeing */ if (last) - last= last->next= (struct st_lf_alloc_node *)cur; + last= next_node(pinbox, last)= (uchar *)cur; else - first= last= (struct st_lf_alloc_node *)cur; + first= last= (uchar *)cur; continue; found: /* pinned - keeping */ @@ -412,22 +410,22 @@ LF_REQUIRE_PINS(1) add it back to the allocator stack DESCRIPTION - 'first' and 'last' are the ends of the linked list of st_lf_alloc_node's: + 'first' and 'last' are the ends of the linked list of nodes: first->el->el->....->el->last. Use first==last to free only one element. */ -static void alloc_free(struct st_lf_alloc_node *first, - struct st_lf_alloc_node volatile *last, +static void alloc_free(uchar *first, + uchar volatile *last, LF_ALLOCATOR *allocator) { /* we need a union here to access type-punned pointer reliably. otherwise gcc -fstrict-aliasing will not see 'tmp' changed in the loop */ - union { struct st_lf_alloc_node * node; void *ptr; } tmp; + union { uchar * node; void *ptr; } tmp; tmp.node= allocator->top; do { - last->next= tmp.node; + anext_node(last)= tmp.node; } while (!my_atomic_casptr((void **)(char *)&allocator->top, (void **)&tmp.ptr, first) && LF_BACKOFF); } @@ -452,6 +450,8 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) allocator->top= 0; allocator->mallocs= 0; allocator->element_size= size; + allocator->constructor= 0; + allocator->destructor= 0; DBUG_ASSERT(size >= sizeof(void*) + free_ptr_offset); } @@ -468,10 +468,12 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset) */ void lf_alloc_destroy(LF_ALLOCATOR *allocator) { - struct st_lf_alloc_node *node= allocator->top; + uchar *node= allocator->top; while (node) { - struct st_lf_alloc_node *tmp= node->next; + uchar *tmp= anext_node(node); + if (allocator->destructor) + allocator->destructor(node); my_free((void *)node, MYF(0)); node= tmp; } @@ -489,7 +491,7 @@ void lf_alloc_destroy(LF_ALLOCATOR *allocator) void *_lf_alloc_new(LF_PINS *pins) { LF_ALLOCATOR *allocator= (LF_ALLOCATOR *)(pins->pinbox->free_func_arg); - struct st_lf_alloc_node *node; + uchar *node; for (;;) { do @@ -500,6 +502,8 @@ void *_lf_alloc_new(LF_PINS *pins) if (!node) { node= (void *)my_malloc(allocator->element_size, MYF(MY_WME)); + if (allocator->constructor) + allocator->constructor(node); #ifdef MY_LF_EXTRA_DEBUG if (likely(node != 0)) my_atomic_add32(&allocator->mallocs, 1); @@ -507,7 +511,7 @@ void *_lf_alloc_new(LF_PINS *pins) break; } if (my_atomic_casptr((void **)(char *)&allocator->top, - (void *)&node, node->next)) + (void *)&node, anext_node(node))) break; } _lf_unpin(pins, 0); @@ -523,8 +527,8 @@ void *_lf_alloc_new(LF_PINS *pins) uint lf_alloc_pool_count(LF_ALLOCATOR *allocator) { uint i; - struct st_lf_alloc_node *node; - for (node= allocator->top, i= 0; node; node= node->next, i++) + uchar *node; + for (node= allocator->top, i= 0; node; node= anext_node(node), i++) /* no op */; return i; } diff --git a/mysys/lf_hash.c b/mysys/lf_hash.c index c197cc99711..008abef0c8b 100644 --- a/mysys/lf_hash.c +++ b/mysys/lf_hash.c @@ -299,11 +299,22 @@ static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *); /* Initializes lf_hash, the arguments are compatible with hash_init + + @@note element_size sets both the size of allocated memory block for + lf_alloc and a size of memcpy'ed block size in lf_hash_insert. Typically + they are the same, indeed. But LF_HASH::element_size can be decreased + after lf_hash_init, and then lf_alloc will allocate larger block that + lf_hash_insert will copy over. It is desireable if part of the element + is expensive to initialize - for example if there is a mutex or + DYNAMIC_ARRAY. In this case they should be initialize in the + LF_ALLOCATOR::constructor, and lf_hash_insert should not overwrite them. + See wt_init() for example. */ void lf_hash_init(LF_HASH *hash, uint element_size, uint flags, uint key_offset, uint key_length, hash_get_key get_key, CHARSET_INFO *charset) { + compile_time_assert(sizeof(LF_SLIST) == LF_HASH_OVERHEAD); lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size, offsetof(LF_SLIST, key)); lf_dynarray_init(&hash->array, sizeof(LF_SLIST *)); @@ -453,7 +464,7 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen) return found ? found+1 : 0; } -static const uchar *dummy_key= ""; +static const uchar *dummy_key= (uchar*)""; /* RETURN @@ -473,7 +484,7 @@ static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node, unlikely(initialize_bucket(hash, el, parent, pins))) return -1; dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */ - dummy->key= (char*) dummy_key; + dummy->key= dummy_key; dummy->keylen= 0; if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE))) { diff --git a/mysys/my_thr_init.c b/mysys/my_thr_init.c index f5fee06916e..1d03577ce34 100644 --- a/mysys/my_thr_init.c +++ b/mysys/my_thr_init.c @@ -256,7 +256,7 @@ my_bool my_thread_init(void) #ifdef EXTRA_DEBUG_THREADS fprintf(stderr,"my_thread_init(): thread_id: 0x%lx\n", (ulong) pthread_self()); -#endif +#endif #if !defined(__WIN__) || defined(USE_TLS) if (my_pthread_getspecific(struct st_my_thread_var *,THR_KEY_mysys)) @@ -264,7 +264,7 @@ my_bool my_thread_init(void) #ifdef EXTRA_DEBUG_THREADS fprintf(stderr,"my_thread_init() called more than once in thread 0x%lx\n", (long) pthread_self()); -#endif +#endif goto end; } if (!(tmp= (struct st_my_thread_var *) calloc(1, sizeof(*tmp)))) @@ -290,6 +290,8 @@ my_bool my_thread_init(void) pthread_mutex_init(&tmp->mutex,MY_MUTEX_INIT_FAST); pthread_cond_init(&tmp->suspend, NULL); + tmp->stack_ends_here= &tmp + STACK_DIRECTION * my_thread_stack_size; + pthread_mutex_lock(&THR_LOCK_threads); tmp->id= ++thread_id; ++THR_thread_count; @@ -325,7 +327,7 @@ void my_thread_end(void) #ifdef EXTRA_DEBUG_THREADS fprintf(stderr,"my_thread_end(): tmp: 0x%lx pthread_self: 0x%lx thread_id: %ld\n", (long) tmp, (long) pthread_self(), tmp ? (long) tmp->id : 0L); -#endif +#endif if (tmp && tmp->init) { #if !defined(DBUG_OFF) diff --git a/mysys/waiting_threads.c b/mysys/waiting_threads.c new file mode 100644 index 00000000000..4d375fdc899 --- /dev/null +++ b/mysys/waiting_threads.c @@ -0,0 +1,641 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* + Note that if your lock system satisfy the following condition: + + there exist four lock levels A, B, C, D, such as + A is compatible with B + A is not compatible with C + D is not compatible with B + + (example A=IX, B=IS, C=S, D=X) + + you need to include lock level in the resource identifier - thread 1 + waiting for lock A on resource R and thread 2 waiting for lock B + on resource R should wait on different WT_RESOURCE structures, on different + {lock, resource} pairs. Otherwise the following is possible: + + thread1> take S-lock on R + thread2> take IS-lock on R + thread3> wants X-lock on R, starts waiting for threads 1 and 2 on R. + thread3 is killed (or timeout or whatever) + WT_RESOURCE structure for R is still in the hash, as it has two owners + thread4> wants an IX-lock on R + WT_RESOURCE for R is found in the hash, thread4 starts waiting on it. + !! now thread4 is waiting for both thread1 and thread2 + !! while, in fact, IX-lock and IS-lock are compatible and + !! thread4 should not wait for thread2. +*/ + +#include <waiting_threads.h> +#include <m_string.h> + +uint wt_timeout_short=100, wt_deadlock_search_depth_short=4; +uint wt_timeout_long=10000, wt_deadlock_search_depth_long=15; + +/* + status variables: + distribution of cycle lengths + wait time log distribution + + Note: + + we call deadlock() twice per wait (with different search lengths). + it means a deadlock will be counted twice. It's difficult to avoid, + as on the second search we could find a *different* deadlock and we + *want* to count it too. So we just count all deadlocks - two searches + mean two increments on the wt_cycle_stats. +*/ + +ulonglong wt_wait_table[WT_WAIT_STATS]; +uint32 wt_wait_stats[WT_WAIT_STATS+1]; +uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats; + +static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock; + +#define increment_success_stats() \ + do { \ + my_atomic_rwlock_wrlock(&success_stats_lock); \ + my_atomic_add32(&wt_success_stats, 1); \ + my_atomic_rwlock_wrunlock(&success_stats_lock); \ + } while (0) + +#define increment_cycle_stats(X,MAX) \ + do { \ + uint i= (X), j= (MAX) == wt_deadlock_search_depth_long; \ + if (i >= WT_CYCLE_STATS) \ + i= WT_CYCLE_STATS; \ + my_atomic_rwlock_wrlock(&cycle_stats_lock); \ + my_atomic_add32(&wt_cycle_stats[j][i], 1); \ + my_atomic_rwlock_wrunlock(&cycle_stats_lock); \ + } while (0) + +#define increment_wait_stats(X,RET) \ + do { \ + uint i; \ + if ((RET) == ETIMEDOUT) \ + i= WT_WAIT_STATS; \ + else \ + { \ + ulonglong w=(X)/10; \ + for (i=0; i < WT_WAIT_STATS && w > wt_wait_table[i]; i++) ; \ + } \ + my_atomic_rwlock_wrlock(&wait_stats_lock); \ + my_atomic_add32(wt_wait_stats+i, 1); \ + my_atomic_rwlock_wrunlock(&wait_stats_lock); \ + } while (0) + +#define rc_rdlock(X) \ + do { \ + WT_RESOURCE *R=(X); \ + DBUG_PRINT("wt", ("LOCK resid=%lld for READ", R->id.value.num)); \ + pthread_rwlock_rdlock(&R->lock); \ + } while (0) +#define rc_wrlock(X) \ + do { \ + WT_RESOURCE *R=(X); \ + DBUG_PRINT("wt", ("LOCK resid=%lld for WRITE", R->id.value.num)); \ + pthread_rwlock_wrlock(&R->lock); \ + } while (0) +#define rc_unlock(X) \ + do { \ + WT_RESOURCE *R=(X); \ + DBUG_PRINT("wt", ("UNLOCK resid=%lld", R->id.value.num)); \ + pthread_rwlock_unlock(&R->lock); \ + } while (0) + +static LF_HASH reshash; + +static void wt_resource_init(uchar *arg) +{ + WT_RESOURCE *rc=(WT_RESOURCE*)(arg+LF_HASH_OVERHEAD); + DBUG_ENTER("wt_resource_init"); + + bzero(rc, sizeof(*rc)); + pthread_rwlock_init(&rc->lock, 0); + pthread_cond_init(&rc->cond, 0); + my_init_dynamic_array(&rc->owners, sizeof(WT_THD *), 5, 5); + DBUG_VOID_RETURN; +} + +static void wt_resource_destroy(uchar *arg) +{ + WT_RESOURCE *rc=(WT_RESOURCE*)(arg+LF_HASH_OVERHEAD); + DBUG_ENTER("wt_resource_destroy"); + + DBUG_ASSERT(rc->owners.elements == 0); + pthread_rwlock_destroy(&rc->lock); + pthread_cond_destroy(&rc->cond); + delete_dynamic(&rc->owners); + DBUG_VOID_RETURN; +} + +void wt_init() +{ + DBUG_ENTER("wt_init"); + + lf_hash_init(&reshash, sizeof(WT_RESOURCE), LF_HASH_UNIQUE, 0, + sizeof(struct st_wt_resource_id), 0, 0); + reshash.alloc.constructor= wt_resource_init; + reshash.alloc.destructor= wt_resource_destroy; + /* + Note a trick: we initialize the hash with the real element size, + but fix it later to a shortened element size. This way + the allocator will allocate elements correctly, but + lf_hash_insert() will only overwrite part of the element with memcpy(). + lock, condition, and dynamic array will be intact. + */ + reshash.element_size= offsetof(WT_RESOURCE, lock); + bzero(wt_wait_stats, sizeof(wt_wait_stats)); + bzero(wt_cycle_stats, sizeof(wt_cycle_stats)); + wt_success_stats=0; + { + int i; + double from=log(1); /* 1 us */ + double to=log(60e6); /* 1 min */ + for (i=0; i < WT_WAIT_STATS; i++) + { + wt_wait_table[i]=(ulonglong)exp((to-from)/(WT_WAIT_STATS-1)*i+from); + DBUG_ASSERT(i==0 || wt_wait_table[i-1] != wt_wait_table[i]); + } + } + my_atomic_rwlock_init(&cycle_stats_lock); + my_atomic_rwlock_init(&success_stats_lock); + my_atomic_rwlock_init(&wait_stats_lock); + DBUG_VOID_RETURN; +} + +void wt_end() +{ + DBUG_ENTER("wt_end"); + + DBUG_ASSERT(reshash.count == 0); + lf_hash_destroy(&reshash); + my_atomic_rwlock_destroy(&cycle_stats_lock); + my_atomic_rwlock_destroy(&success_stats_lock); + my_atomic_rwlock_destroy(&wait_stats_lock); + DBUG_VOID_RETURN; +} + +void wt_thd_init(WT_THD *thd) +{ + DBUG_ENTER("wt_thd_init"); + + my_init_dynamic_array(&thd->my_resources, sizeof(WT_RESOURCE *), 10, 5); + thd->pins=lf_hash_get_pins(&reshash); + thd->waiting_for=0; + thd->weight=0; +#ifndef DBUG_OFF + thd->name=my_thread_name(); +#endif + DBUG_VOID_RETURN; +} + +void wt_thd_destroy(WT_THD *thd) +{ + DBUG_ENTER("wt_thd_destroy"); + + DBUG_ASSERT(thd->my_resources.elements == 0); + delete_dynamic(&thd->my_resources); + lf_hash_put_pins(thd->pins); + thd->waiting_for=0; + DBUG_VOID_RETURN; +} + +int wt_resource_id_memcmp(void *a, void *b) +{ + return memcmp(a, b, sizeof(WT_RESOURCE_ID)); +} + +struct deadlock_arg { + WT_THD *thd; + uint max_depth; + WT_THD *victim; + WT_RESOURCE *rc; +}; + +/* + loop detection in a wait-for graph with a limited search depth. +*/ +static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker, + uint depth) +{ + WT_RESOURCE *rc, *volatile *shared_ptr= &blocker->waiting_for; + WT_THD *cursor; + uint i; + int ret= WT_OK; + DBUG_ENTER("deadlock_search"); + DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, depth=%u", + arg->thd->name, blocker->name, depth)); + + LF_REQUIRE_PINS(1); + + arg->rc= 0; + + if (depth > arg->max_depth) + { + DBUG_PRINT("wt", ("exit: WT_DEPTH_EXCEEDED (early)")); + DBUG_RETURN(WT_DEPTH_EXCEEDED); + } + +retry: + /* safe dereference as explained in lf_alloc-pin.c */ + do + { + rc= *shared_ptr; + lf_pin(arg->thd->pins, 0, rc); + } while (rc != *shared_ptr && LF_BACKOFF); + + if (rc == 0) + { + DBUG_PRINT("wt", ("exit: OK (early)")); + DBUG_RETURN(0); + } + + rc_rdlock(rc); + if (rc->state != ACTIVE || *shared_ptr != rc) + { + rc_unlock(rc); + lf_unpin(arg->thd->pins, 0); + goto retry; + } + lf_unpin(arg->thd->pins, 0); + + for (i=0; i < rc->owners.elements; i++) + { + cursor= *dynamic_element(&rc->owners, i, WT_THD**); + if (cursor == arg->thd) + { + ret= WT_DEADLOCK; + increment_cycle_stats(depth, arg->max_depth); + arg->victim= cursor; + goto end; + } + } + for (i=0; i < rc->owners.elements; i++) + { + cursor= *dynamic_element(&rc->owners, i, WT_THD**); + switch (deadlock_search(arg, cursor, depth+1)) { + case WT_DEPTH_EXCEEDED: + ret= WT_DEPTH_EXCEEDED; + break; + case WT_DEADLOCK: + ret= WT_DEADLOCK; + if (cursor->weight < arg->victim->weight) + { + if (arg->victim != arg->thd) + { + rc_unlock(arg->victim->waiting_for); /* release the previous victim */ + DBUG_ASSERT(arg->rc == cursor->waiting_for); + } + arg->victim= cursor; + } + else if (arg->rc) + rc_unlock(arg->rc); + goto end; + case WT_OK: + break; + default: + DBUG_ASSERT(0); + } + if (arg->rc) + rc_unlock(arg->rc); + } +end: + arg->rc= rc; + DBUG_PRINT("wt", ("exit: %s", + ret == WT_DEPTH_EXCEEDED ? "WT_DEPTH_EXCEEDED" : + ret ? "WT_DEADLOCK" : "OK")); + DBUG_RETURN(ret); +} + +static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth, + uint max_depth) +{ + struct deadlock_arg arg= {thd, max_depth, 0, 0}; + int ret; + DBUG_ENTER("deadlock"); + ret= deadlock_search(&arg, blocker, depth); + if (arg.rc) + rc_unlock(arg.rc); + if (ret == WT_DEPTH_EXCEEDED) + { + increment_cycle_stats(WT_CYCLE_STATS, max_depth); + ret= WT_OK; + } + if (ret == WT_DEADLOCK && arg.victim != thd) + { + DBUG_PRINT("wt", ("killing %s", arg.victim->name)); + arg.victim->killed=1; + pthread_cond_broadcast(&arg.victim->waiting_for->cond); + rc_unlock(arg.victim->waiting_for); + ret= WT_OK; + } + DBUG_RETURN(ret); +} + + +/* + Deletes an element from reshash. + rc->lock must be locked by the caller and it's unlocked on return. +*/ +static void unlock_lock_and_free_resource(WT_THD *thd, WT_RESOURCE *rc) +{ + uint keylen; + const void *key; + DBUG_ENTER("unlock_lock_and_free_resource"); + + DBUG_ASSERT(rc->state == ACTIVE); + + if (rc->owners.elements || rc->waiter_count) + { + DBUG_PRINT("wt", ("nothing to do, %d owners, %d waiters", + rc->owners.elements, rc->waiter_count)); + rc_unlock(rc); + DBUG_VOID_RETURN; + } + + /* XXX if (rc->id.type->make_key) key= rc->id.type->make_key(&rc->id, &keylen); else */ + { + key= &rc->id; + keylen= sizeof(rc->id); + } + + /* + To free the element correctly we need to: + 1. take its lock (already done). + 2. set the state to FREE + 3. release the lock + 4. remove from the hash + + I *think* it's safe to release the lock while the element is still + in the hash. If not, the corrected procedure should be + 3. pin; 4; remove; 5; release; 6; unpin and it'll need pin[3]. + */ + rc->state=FREE; + rc_unlock(rc); + lf_hash_delete(&reshash, thd->pins, key, keylen); + DBUG_VOID_RETURN; +} + + +int wt_thd_dontwait_locked(WT_THD *thd) +{ + WT_RESOURCE *rc= thd->waiting_for; + DBUG_ENTER("wt_thd_dontwait_locked"); + + DBUG_ASSERT(rc->waiter_count); + DBUG_ASSERT(rc->state == ACTIVE); + rc->waiter_count--; + thd->waiting_for= 0; + unlock_lock_and_free_resource(thd, rc); + DBUG_RETURN(thd->killed ? WT_DEADLOCK : WT_OK); +} + +int wt_thd_dontwait(WT_THD *thd) +{ + int ret; + WT_RESOURCE *rc= thd->waiting_for; + DBUG_ENTER("wt_thd_dontwait"); + + if (!rc) + DBUG_RETURN(WT_OK); + /* + nobody's trying to free the resource now, + as its waiter_count is guaranteed to be non-zero + */ + rc_wrlock(rc); + ret= wt_thd_dontwait_locked(thd); + DBUG_RETURN(ret); +} + +/* + called by a *waiter* to declare what resource it will wait for. + can be called many times, if many blockers own a blocking resource. + but must always be called with the same resource id - a thread cannot + wait for more than one resource at a time. +*/ +int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid) +{ + uint i; + WT_RESOURCE *rc; + DBUG_ENTER("wt_thd_will_wait_for"); + + LF_REQUIRE_PINS(3); + + DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%llu", + thd->name, blocker->name, resid->value.num)); + + if (thd->waiting_for == 0) + { + uint keylen; + const void *key; + /* XXX if (restype->make_key) key= restype->make_key(resid, &keylen); else */ + { + key= resid; + keylen= sizeof(*resid); + } + + DBUG_PRINT("wt", ("first blocker")); + +retry: + while ((rc= lf_hash_search(&reshash, thd->pins, key, keylen)) == 0) + { + WT_RESOURCE tmp; + + DBUG_PRINT("wt", ("failed to find rc in hash, inserting")); + bzero(&tmp, sizeof(tmp)); + tmp.waiter_count= 0; + tmp.id= *resid; + tmp.state= ACTIVE; +#ifndef DBUG_OFF + tmp.mutex= 0; +#endif + + lf_hash_insert(&reshash, thd->pins, &tmp); + /* + Two cases: either lf_hash_insert() failed - because another thread + has just inserted a resource with the same id - and we need to retry. + Or lf_hash_insert() succeeded, and then we need to repeat + lf_hash_search() to find a real address of the newly inserted element. + That is, we don't care what lf_hash_insert() has returned. + And we need to repeat the loop anyway. + */ + } + DBUG_PRINT("wt", ("found in hash rc=%p", rc)); + + rc_wrlock(rc); + if (rc->state != ACTIVE) + { + DBUG_PRINT("wt", ("but it's not active, retrying")); + /* Somebody has freed the element while we weren't looking */ + rc_unlock(rc); + lf_hash_search_unpin(thd->pins); + goto retry; + } + + lf_hash_search_unpin(thd->pins); /* the element cannot go away anymore */ + thd->waiting_for= rc; + rc->waiter_count++; + thd->killed= 0; + + } + else + { + DBUG_ASSERT(thd->waiting_for->id.type == resid->type); + DBUG_ASSERT(resid->type->compare(&thd->waiting_for->id, resid) == 0); + DBUG_PRINT("wt", ("adding another blocker")); + + /* + we can safely access the resource here, it's in the hash as it has + at least one owner, and non-zero waiter_count + */ + rc= thd->waiting_for; + rc_wrlock(rc); + DBUG_ASSERT(rc->waiter_count); + DBUG_ASSERT(rc->state == ACTIVE); + + if (thd->killed) + { + wt_thd_dontwait_locked(thd); + DBUG_RETURN(WT_DEADLOCK); + } + } + for (i=0; i < rc->owners.elements; i++) + if (*dynamic_element(&rc->owners, i, WT_THD**) == blocker) + break; + if (i >= rc->owners.elements) + { + push_dynamic(&blocker->my_resources, (void*)&rc); + push_dynamic(&rc->owners, (void*)&blocker); + } + rc_unlock(rc); + + if (deadlock(thd, blocker, 1, wt_deadlock_search_depth_short)) + { + wt_thd_dontwait(thd); + DBUG_RETURN(WT_DEADLOCK); + } + DBUG_RETURN(0); +} + +/* + called by a *waiter* to start waiting + + It's supposed to be a drop-in replacement for + pthread_cond_timedwait(), and it takes mutex as an argument. +*/ +int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex) +{ + int ret= WT_OK; + struct timespec timeout; + ulonglong before, after, starttime; + WT_RESOURCE *rc= thd->waiting_for; + DBUG_ENTER("wt_thd_cond_timedwait"); + DBUG_PRINT("wt", ("enter: thd=%s, rc=%p", thd->name, rc)); + +#ifndef DBUG_OFF + if (rc->mutex) + DBUG_ASSERT(rc->mutex == mutex); + else + rc->mutex= mutex; + safe_mutex_assert_owner(mutex); +#endif + + before= starttime= my_getsystime(); + +#ifdef __WIN__ + /* + only for the sake of Windows we distinguish between + 'before' and 'starttime' + */ + GetSystemTimeAsFileTime((PFILETIME)&starttime); +#endif + + set_timespec_time_nsec(timeout, starttime, wt_timeout_short*1000); + if (!thd->killed) + ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout); + if (ret == WT_TIMEOUT) + { + if (deadlock(thd, thd, 0, wt_deadlock_search_depth_long)) + ret= WT_DEADLOCK; + else if (wt_timeout_long > wt_timeout_short) + { + set_timespec_time_nsec(timeout, starttime, wt_timeout_long*1000); + if (!thd->killed) + ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout); + } + } + after= my_getsystime(); + if (wt_thd_dontwait(thd) == WT_DEADLOCK) + ret= WT_DEADLOCK; + increment_wait_stats(after-before, ret); + if (ret == WT_OK) + increment_success_stats(); + DBUG_RETURN(ret); +} + +/* + called by a *blocker* when it releases a resource + + when resid==0 all resources will be freed + + Note: it's conceptually similar to pthread_cond_broadcast, and must be done + under the same mutex as wt_thd_cond_timedwait(). +*/ +void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid) +{ + WT_RESOURCE *rc; + uint i, j; + DBUG_ENTER("wt_thd_release"); + + for (i=0; i < thd->my_resources.elements; i++) + { + rc= *dynamic_element(&thd->my_resources, i, WT_RESOURCE**); + if (!resid || (resid->type->compare(&rc->id, resid) == 0)) + { + rc_wrlock(rc); + /* + nobody's trying to free the resource now, + as its owners[] array is not empty (at least thd must be there) + */ + DBUG_ASSERT(rc->state == ACTIVE); + for (j=0; j < rc->owners.elements; j++) + if (*dynamic_element(&rc->owners, j, WT_THD**) == thd) + break; + DBUG_ASSERT(j < rc->owners.elements); + delete_dynamic_element(&rc->owners, j); + if (rc->owners.elements == 0) + { + pthread_cond_broadcast(&rc->cond); +#ifndef DBUG_OFF + if (rc->mutex) + safe_mutex_assert_owner(rc->mutex); +#endif + } + unlock_lock_and_free_resource(thd, rc); + if (resid) + { + delete_dynamic_element(&thd->my_resources, i); + DBUG_VOID_RETURN; + } + } + } + DBUG_ASSERT(!resid); + reset_dynamic(&thd->my_resources); + DBUG_VOID_RETURN; +} + diff --git a/storage/maria/ha_maria.cc b/storage/maria/ha_maria.cc index 5e9554b19d8..e4d91257a43 100644 --- a/storage/maria/ha_maria.cc +++ b/storage/maria/ha_maria.cc @@ -2246,10 +2246,7 @@ int ha_maria::external_lock(THD *thd, int lock_type) /* Start of new statement */ if (!trn) /* no transaction yet - open it now */ { - trn= trnman_new_trn(& thd->mysys_var->mutex, - & thd->mysys_var->suspend, - thd->thread_stack + STACK_DIRECTION * - (my_thread_stack_size - STACK_MIN_SIZE)); + trn= trnman_new_trn(& thd->mysys_var->mutex, & thd->mysys_var->suspend); if (unlikely(!trn)) DBUG_RETURN(HA_ERR_OUT_OF_MEM); THD_TRN= trn; @@ -2389,10 +2386,7 @@ int ha_maria::implicit_commit(THD *thd) tables may be under LOCK TABLES, and so they will start the next statement assuming they have a trn (see ha_maria::start_stmt()). */ - trn= trnman_new_trn(& thd->mysys_var->mutex, - & thd->mysys_var->suspend, - thd->thread_stack + STACK_DIRECTION * - (my_thread_stack_size - STACK_MIN_SIZE)); + trn= trnman_new_trn(& thd->mysys_var->mutex, & thd->mysys_var->suspend); /* This is just a commit, tables stay locked if they were: */ trnman_reset_locked_tables(trn, locked_tables); THD_TRN= trn; diff --git a/storage/maria/ma_commit.c b/storage/maria/ma_commit.c index e7c82d651c5..02a07fe5f15 100644 --- a/storage/maria/ma_commit.c +++ b/storage/maria/ma_commit.c @@ -108,9 +108,7 @@ int maria_begin(MARIA_HA *info) { TRN *trn; struct st_my_thread_var *mysys_var= my_thread_var; - trn= trnman_new_trn(&mysys_var->mutex, - &mysys_var->suspend, - (char*) &mysys_var + STACK_DIRECTION *1024*128); + trn= trnman_new_trn(&mysys_var->mutex, &mysys_var->suspend); if (unlikely(!trn)) DBUG_RETURN(HA_ERR_OUT_OF_MEM); diff --git a/storage/maria/trnman.c b/storage/maria/trnman.c index 1c2385e9476..ad742c132e0 100644 --- a/storage/maria/trnman.c +++ b/storage/maria/trnman.c @@ -260,8 +260,7 @@ static void set_short_trid(TRN *trn) mutex and cond will be used for lock waits */ -TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond, - void *stack_end) +TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond) { TRN *trn; DBUG_ENTER("trnman_new_trn"); @@ -308,7 +307,7 @@ TRN *trnman_new_trn(pthread_mutex_t *mutex, pthread_cond_t *cond, } trnman_allocated_transactions++; } - trn->pins= lf_hash_get_pins(&trid_to_committed_trn, stack_end); + trn->pins= lf_hash_get_pins(&trid_to_committed_trn); if (!trn->pins) { trnman_free_trn(trn); @@ -761,7 +760,7 @@ TRN *trnman_recreate_trn_from_recovery(uint16 shortid, TrID longid) TrID old_trid_generator= global_trid_generator; TRN *trn; DBUG_ASSERT(maria_in_recovery && !maria_multi_threaded); - if (unlikely((trn= trnman_new_trn(NULL, NULL, NULL)) == NULL)) + if (unlikely((trn= trnman_new_trn(NULL, NULL)) == NULL)) return NULL; /* deallocate excessive allocations of trnman_new_trn() */ global_trid_generator= old_trid_generator; diff --git a/storage/maria/trnman_public.h b/storage/maria/trnman_public.h index 0005e556eb1..4286ec7e0a2 100644 --- a/storage/maria/trnman_public.h +++ b/storage/maria/trnman_public.h @@ -38,7 +38,7 @@ extern my_bool (*trnman_end_trans_hook)(TRN *trn, my_bool commit, int trnman_init(TrID); void trnman_destroy(void); -TRN *trnman_new_trn(pthread_mutex_t *, pthread_cond_t *, void *); +TRN *trnman_new_trn(pthread_mutex_t *, pthread_cond_t *); my_bool trnman_end_trn(TRN *trn, my_bool commit); #define trnman_commit_trn(T) trnman_end_trn(T, TRUE) #define trnman_abort_trn(T) trnman_end_trn(T, FALSE) diff --git a/storage/maria/unittest/trnman-t.c b/storage/maria/unittest/trnman-t.c index 439df10e66e..0873761c0a2 100644 --- a/storage/maria/unittest/trnman-t.c +++ b/storage/maria/unittest/trnman-t.c @@ -42,6 +42,8 @@ pthread_handler_t test_trnman(void *arg) pthread_cond_t conds[MAX_ITER]; int m= (*(int *)arg); + my_thread_init(); + for (i= 0; i < MAX_ITER; i++) { pthread_mutex_init(&mutexes[i], MY_MUTEX_INIT_FAST); @@ -54,7 +56,7 @@ pthread_handler_t test_trnman(void *arg) m-= n= x % MAX_ITER; for (i= 0; i < n; i++) { - trn[i]= trnman_new_trn(&mutexes[i], &conds[i], &m + STACK_SIZE); + trn[i]= trnman_new_trn(&mutexes[i], &conds[i]); if (!trn[i]) { diag("trnman_new_trn() failed"); @@ -76,6 +78,8 @@ pthread_handler_t test_trnman(void *arg) rt_num_threads--; pthread_mutex_unlock(&rt_mutex); + my_thread_end(); + return 0; } #undef MAX_ITER @@ -114,7 +118,7 @@ void run_test(const char *test, pthread_handler handler, int n, int m) i= trnman_can_read_from(trn[T1], trid[T2]); \ ok(i == RES, "trn" #T1 " %s read from trn" #T2, i ? "can" : "cannot") #define start_transaction(T) \ - trn[T]= trnman_new_trn(&mutexes[T], &conds[T], &i + STACK_SIZE); \ + trn[T]= trnman_new_trn(&mutexes[T], &conds[T]); \ trid[T]= trn[T]->trid #define commit(T) trnman_commit_trn(trn[T]) #define abort(T) trnman_abort_trn(trn[T]) @@ -159,7 +163,6 @@ void test_trnman_read_from() int main(int argc __attribute__((unused)), char **argv) { MY_INIT(argv[0]); - my_init(); plan(7); diff --git a/unittest/mysys/Makefile.am b/unittest/mysys/Makefile.am index 96a775a58a9..e763fcab3ba 100644 --- a/unittest/mysys/Makefile.am +++ b/unittest/mysys/Makefile.am @@ -16,7 +16,7 @@ INCLUDES = @ZLIB_INCLUDES@ -I$(top_builddir)/include \ -I$(top_srcdir)/include -I$(top_srcdir)/unittest/mytap -noinst_PROGRAMS = bitmap-t base64-t my_atomic-t +noinst_PROGRAMS = bitmap-t base64-t my_atomic-t lf-t waiting_threads-t LDADD = $(top_builddir)/unittest/mytap/libmytap.a \ $(top_builddir)/mysys/libmysys.a \ diff --git a/unittest/mysys/lf-t.c b/unittest/mysys/lf-t.c new file mode 100644 index 00000000000..1a424c19e22 --- /dev/null +++ b/unittest/mysys/lf-t.c @@ -0,0 +1,168 @@ +/* Copyright (C) 2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "thr_template.c" + +#include <lf.h> + +int32 inserts= 0, N; +LF_ALLOCATOR lf_allocator; +LF_HASH lf_hash; + +/* + pin allocator - alloc and release an element in a loop +*/ +pthread_handler_t test_lf_pinbox(void *arg) +{ + int m= *(int *)arg; + int32 x= 0; + LF_PINS *pins; + + my_thread_init(); + + pins= lf_pinbox_get_pins(&lf_allocator.pinbox); + + for (x= ((int)(intptr)(&m)); m ; m--) + { + lf_pinbox_put_pins(pins); + pins= lf_pinbox_get_pins(&lf_allocator.pinbox); + } + lf_pinbox_put_pins(pins); + pthread_mutex_lock(&mutex); + if (!--running_threads) pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + my_thread_end(); + return 0; +} + +typedef union { + int32 data; + void *not_used; +} TLA; + +pthread_handler_t test_lf_alloc(void *arg) +{ + int m= (*(int *)arg)/2; + int32 x,y= 0; + LF_PINS *pins; + + my_thread_init(); + + pins= lf_alloc_get_pins(&lf_allocator); + + for (x= ((int)(intptr)(&m)); m ; m--) + { + TLA *node1, *node2; + x= (x*m+0x87654321) & INT_MAX32; + node1= (TLA *)lf_alloc_new(pins); + node1->data= x; + y+= node1->data; + node1->data= 0; + node2= (TLA *)lf_alloc_new(pins); + node2->data= x; + y-= node2->data; + node2->data= 0; + lf_alloc_free(pins, node1); + lf_alloc_free(pins, node2); + } + lf_alloc_put_pins(pins); + pthread_mutex_lock(&mutex); + bad+= y; + + if (--N == 0) + { + diag("%d mallocs, %d pins in stack", + lf_allocator.mallocs, lf_allocator.pinbox.pins_in_array); +#ifdef MY_LF_EXTRA_DEBUG + bad|= lf_allocator.mallocs - lf_alloc_pool_count(&lf_allocator); +#endif + } + if (!--running_threads) pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + my_thread_end(); + return 0; +} + +#define N_TLH 1000 +pthread_handler_t test_lf_hash(void *arg) +{ + int m= (*(int *)arg)/(2*N_TLH); + int32 x,y,z,sum= 0, ins= 0; + LF_PINS *pins; + + my_thread_init(); + + pins= lf_hash_get_pins(&lf_hash); + + for (x= ((int)(intptr)(&m)); m ; m--) + { + int i; + y= x; + for (i= 0; i < N_TLH; i++) + { + x= (x*(m+i)+0x87654321) & INT_MAX32; + z= (x<0) ? -x : x; + if (lf_hash_insert(&lf_hash, pins, &z)) + { + sum+= z; + ins++; + } + } + for (i= 0; i < N_TLH; i++) + { + y= (y*(m+i)+0x87654321) & INT_MAX32; + z= (y<0) ? -y : y; + if (lf_hash_delete(&lf_hash, pins, (uchar *)&z, sizeof(z))) + sum-= z; + } + } + lf_hash_put_pins(pins); + pthread_mutex_lock(&mutex); + bad+= sum; + inserts+= ins; + + if (--N == 0) + { + diag("%d mallocs, %d pins in stack, %d hash size, %d inserts", + lf_hash.alloc.mallocs, lf_hash.alloc.pinbox.pins_in_array, + lf_hash.size, inserts); + bad|= lf_hash.count; + } + if (!--running_threads) pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + my_thread_end(); + return 0; +} + + +void do_tests() +{ + plan(4); + + lf_alloc_init(&lf_allocator, sizeof(TLA), offsetof(TLA, not_used)); + lf_hash_init(&lf_hash, sizeof(int), LF_HASH_UNIQUE, 0, sizeof(int), 0, + &my_charset_bin); + + bad= my_atomic_initialize(); + ok(!bad, "my_atomic_initialize() returned %d", bad); + + test_concurrently("lf_pinbox", test_lf_pinbox, N= THREADS, CYCLES); + test_concurrently("lf_alloc", test_lf_alloc, N= THREADS, CYCLES); + test_concurrently("lf_hash", test_lf_hash, N= THREADS, CYCLES/10); + + lf_hash_destroy(&lf_hash); + lf_alloc_destroy(&lf_allocator); +} + diff --git a/unittest/mysys/my_atomic-t.c b/unittest/mysys/my_atomic-t.c index 48476551ba8..26591ad35d2 100644 --- a/unittest/mysys/my_atomic-t.c +++ b/unittest/mysys/my_atomic-t.c @@ -13,11 +13,7 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include <my_global.h> -#include <my_sys.h> -#include <my_atomic.h> -#include <tap.h> -#include <lf.h> +#include "thr_template.c" /* at least gcc 3.4.5 and 3.4.6 (but not 3.2.3) on RHEL */ #if __GNUC__ == 3 && __GNUC_MINOR__ == 4 @@ -26,20 +22,12 @@ #define GCC_BUG_WORKAROUND #endif -volatile uint32 a32,b32; -volatile int32 c32, N; +volatile uint32 b32; +volatile int32 c32; my_atomic_rwlock_t rwl; -LF_ALLOCATOR lf_allocator; -LF_HASH lf_hash; -pthread_attr_t thr_attr; -pthread_mutex_t mutex; -pthread_cond_t cond; -uint running_threads; -size_t stacksize= 0; -#define STACK_SIZE (((int)stacksize-2048)*STACK_DIRECTION) /* add and sub a random number in a loop. Must get 0 at the end */ -pthread_handler_t test_atomic_add_handler(void *arg) +pthread_handler_t test_atomic_add(void *arg) { int m= (*(int *)arg)/2; GCC_BUG_WORKAROUND int32 x; @@ -47,11 +35,11 @@ pthread_handler_t test_atomic_add_handler(void *arg) { x= (x*m+0x87654321) & INT_MAX32; my_atomic_rwlock_wrlock(&rwl); - my_atomic_add32(&a32, x); + my_atomic_add32(&bad, x); my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrlock(&rwl); - my_atomic_add32(&a32, -x); + my_atomic_add32(&bad, -x); my_atomic_rwlock_wrunlock(&rwl); } pthread_mutex_lock(&mutex); @@ -62,13 +50,13 @@ pthread_handler_t test_atomic_add_handler(void *arg) /* 1. generate thread number 0..N-1 from b32 - 2. add it to a32 + 2. add it to bad 3. swap thread numbers in c32 4. (optionally) one more swap to avoid 0 as a result - 5. subtract result from a32 - must get 0 in a32 at the end + 5. subtract result from bad + must get 0 in bad at the end */ -pthread_handler_t test_atomic_fas_handler(void *arg) +pthread_handler_t test_atomic_fas(void *arg) { int m= *(int *)arg; int32 x; @@ -78,7 +66,7 @@ pthread_handler_t test_atomic_fas_handler(void *arg) my_atomic_rwlock_wrunlock(&rwl); my_atomic_rwlock_wrlock(&rwl); - my_atomic_add32(&a32, x); + my_atomic_add32(&bad, x); my_atomic_rwlock_wrunlock(&rwl); for (; m ; m--) @@ -96,7 +84,7 @@ pthread_handler_t test_atomic_fas_handler(void *arg) } my_atomic_rwlock_wrlock(&rwl); - my_atomic_add32(&a32, -x); + my_atomic_add32(&bad, -x); my_atomic_rwlock_wrunlock(&rwl); pthread_mutex_lock(&mutex); @@ -106,28 +94,28 @@ pthread_handler_t test_atomic_fas_handler(void *arg) } /* - same as test_atomic_add_handler, but my_atomic_add32 is emulated with + same as test_atomic_add, but my_atomic_add32 is emulated with my_atomic_cas32 - notice that the slowdown is proportional to the number of CPUs */ -pthread_handler_t test_atomic_cas_handler(void *arg) +pthread_handler_t test_atomic_cas(void *arg) { int m= (*(int *)arg)/2, ok= 0; GCC_BUG_WORKAROUND int32 x, y; for (x= ((int)(intptr)(&m)); m ; m--) { my_atomic_rwlock_wrlock(&rwl); - y= my_atomic_load32(&a32); + y= my_atomic_load32(&bad); my_atomic_rwlock_wrunlock(&rwl); x= (x*m+0x87654321) & INT_MAX32; do { my_atomic_rwlock_wrlock(&rwl); - ok= my_atomic_cas32(&a32, &y, (uint32)y+x); + ok= my_atomic_cas32(&bad, &y, (uint32)y+x); my_atomic_rwlock_wrunlock(&rwl); } while (!ok) ; do { my_atomic_rwlock_wrlock(&rwl); - ok= my_atomic_cas32(&a32, &y, y-x); + ok= my_atomic_cas32(&bad, &y, y-x); my_atomic_rwlock_wrunlock(&rwl); } while (!ok) ; } @@ -137,212 +125,22 @@ pthread_handler_t test_atomic_cas_handler(void *arg) return 0; } - -/* - pin allocator - alloc and release an element in a loop -*/ -pthread_handler_t test_lf_pinbox(void *arg) -{ - int m= *(int *)arg; - int32 x= 0; - LF_PINS *pins; - - pins= lf_pinbox_get_pins(&lf_allocator.pinbox, &m + STACK_SIZE); - - for (x= ((int)(intptr)(&m)); m ; m--) - { - lf_pinbox_put_pins(pins); - pins= lf_pinbox_get_pins(&lf_allocator.pinbox, &m + STACK_SIZE); - } - lf_pinbox_put_pins(pins); - pthread_mutex_lock(&mutex); - if (!--running_threads) pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); - return 0; -} - -typedef union { - int32 data; - void *not_used; -} TLA; - -pthread_handler_t test_lf_alloc(void *arg) -{ - int m= (*(int *)arg)/2; - int32 x,y= 0; - LF_PINS *pins; - - pins= lf_alloc_get_pins(&lf_allocator, &m + STACK_SIZE); - - for (x= ((int)(intptr)(&m)); m ; m--) - { - TLA *node1, *node2; - x= (x*m+0x87654321) & INT_MAX32; - node1= (TLA *)lf_alloc_new(pins); - node1->data= x; - y+= node1->data; - node1->data= 0; - node2= (TLA *)lf_alloc_new(pins); - node2->data= x; - y-= node2->data; - node2->data= 0; - lf_alloc_free(pins, node1); - lf_alloc_free(pins, node2); - } - lf_alloc_put_pins(pins); - my_atomic_rwlock_wrlock(&rwl); - my_atomic_add32(&a32, y); - - if (my_atomic_add32(&N, -1) == 1) - { - diag("%d mallocs, %d pins in stack", - lf_allocator.mallocs, lf_allocator.pinbox.pins_in_array); -#ifdef MY_LF_EXTRA_DEBUG - a32|= lf_allocator.mallocs - lf_alloc_pool_count(&lf_allocator); -#endif - } - my_atomic_rwlock_wrunlock(&rwl); - pthread_mutex_lock(&mutex); - if (!--running_threads) pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); - return 0; -} - -#define N_TLH 1000 -pthread_handler_t test_lf_hash(void *arg) +void do_tests() { - int m= (*(int *)arg)/(2*N_TLH); - int32 x,y,z,sum= 0, ins= 0; - LF_PINS *pins; + plan(4); - pins= lf_hash_get_pins(&lf_hash, &m + STACK_SIZE); + bad= my_atomic_initialize(); + ok(!bad, "my_atomic_initialize() returned %d", bad); - for (x= ((int)(intptr)(&m)); m ; m--) - { - int i; - y= x; - for (i= 0; i < N_TLH; i++) - { - x= (x*(m+i)+0x87654321) & INT_MAX32; - z= (x<0) ? -x : x; - if (lf_hash_insert(&lf_hash, pins, &z)) - { - sum+= z; - ins++; - } - } - for (i= 0; i < N_TLH; i++) - { - y= (y*(m+i)+0x87654321) & INT_MAX32; - z= (y<0) ? -y : y; - if (lf_hash_delete(&lf_hash, pins, (uchar *)&z, sizeof(z))) - sum-= z; - } - } - lf_hash_put_pins(pins); - my_atomic_rwlock_wrlock(&rwl); - my_atomic_add32(&a32, sum); - my_atomic_add32(&b32, ins); - - if (my_atomic_add32(&N, -1) == 1) - { - diag("%d mallocs, %d pins in stack, %d hash size, %d inserts", - lf_hash.alloc.mallocs, lf_hash.alloc.pinbox.pins_in_array, - lf_hash.size, b32); - a32|= lf_hash.count; - } - my_atomic_rwlock_wrunlock(&rwl); - pthread_mutex_lock(&mutex); - if (!--running_threads) pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); - return 0; -} - - -void test_atomic(const char *test, pthread_handler handler, int n, int m) -{ - pthread_t t; - ulonglong now= my_getsystime(); - - a32= 0; - b32= 0; - c32= 0; - - diag("Testing %s with %d threads, %d iterations... ", test, n, m); - for (running_threads= n ; n ; n--) - { - if (pthread_create(&t, &thr_attr, handler, &m) != 0) - { - diag("Could not create thread"); - abort(); - } - } - pthread_mutex_lock(&mutex); - while (running_threads) - pthread_cond_wait(&cond, &mutex); - pthread_mutex_unlock(&mutex); - - now= my_getsystime()-now; - ok(a32 == 0, "tested %s in %g secs (%d)", test, ((double)now)/1e7, a32); -} - -int main() -{ - int err; - MY_INIT("my_atomic-t.c"); - - diag("N CPUs: %d, atomic ops: %s", my_getncpus(), MY_ATOMIC_MODE); - err= my_atomic_initialize(); - - plan(7); - ok(err == 0, "my_atomic_initialize() returned %d", err); - - pthread_mutex_init(&mutex, 0); - pthread_cond_init(&cond, 0); my_atomic_rwlock_init(&rwl); - lf_alloc_init(&lf_allocator, sizeof(TLA), offsetof(TLA, not_used)); - lf_hash_init(&lf_hash, sizeof(int), LF_HASH_UNIQUE, 0, sizeof(int), 0, - &my_charset_bin); - pthread_attr_init(&thr_attr); - pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); -#ifdef HAVE_PTHREAD_ATTR_GETSTACKSIZE - pthread_attr_getstacksize(&thr_attr, &stacksize); - if (stacksize == 0) -#endif - stacksize = PTHREAD_STACK_MIN; - - -#ifdef MY_ATOMIC_MODE_RWLOCKS -#if defined(HPUX11) || defined(__POWERPC__) /* showed to be very slow (scheduler-related) */ -#define CYCLES 300 -#else -#define CYCLES 3000 -#endif -#else -#define CYCLES 300000 -#endif -#define THREADS 100 - - test_atomic("my_atomic_add32", test_atomic_add_handler, THREADS,CYCLES); - test_atomic("my_atomic_fas32", test_atomic_fas_handler, THREADS,CYCLES); - test_atomic("my_atomic_cas32", test_atomic_cas_handler, THREADS,CYCLES); - test_atomic("lf_pinbox", test_lf_pinbox, THREADS,CYCLES); - test_atomic("lf_alloc", test_lf_alloc, THREADS,CYCLES); - test_atomic("lf_hash", test_lf_hash, THREADS,CYCLES/10); - lf_hash_destroy(&lf_hash); - lf_alloc_destroy(&lf_allocator); + b32= c32= 0; + test_concurrently("my_atomic_add32", test_atomic_add, THREADS, CYCLES); + b32= c32= 0; + test_concurrently("my_atomic_fas32", test_atomic_fas, THREADS, CYCLES); + b32= c32= 0; + test_concurrently("my_atomic_cas32", test_atomic_cas, THREADS, CYCLES); - /* - workaround until we know why it crashes randomly on some machine - (BUG#22320). - */ - sleep(2); - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); - pthread_attr_destroy(&thr_attr); my_atomic_rwlock_destroy(&rwl); - my_end(0); - return exit_status(); } diff --git a/unittest/mysys/thr_template.c b/unittest/mysys/thr_template.c new file mode 100644 index 00000000000..699c941499f --- /dev/null +++ b/unittest/mysys/thr_template.c @@ -0,0 +1,92 @@ +/* Copyright (C) 2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <my_global.h> +#include <my_sys.h> +#include <my_atomic.h> +#include <tap.h> + +volatile uint32 bad; +pthread_attr_t thr_attr; +pthread_mutex_t mutex; +pthread_cond_t cond; +uint running_threads; + +void do_tests(); + +void test_concurrently(const char *test, pthread_handler handler, int n, int m) +{ + pthread_t t; + ulonglong now= my_getsystime(); + + bad= 0; + + diag("Testing %s with %d threads, %d iterations... ", test, n, m); + for (running_threads= n ; n ; n--) + { + if (pthread_create(&t, &thr_attr, handler, &m) != 0) + { + diag("Could not create thread"); + abort(); + } + } + pthread_mutex_lock(&mutex); + while (running_threads) + pthread_cond_wait(&cond, &mutex); + pthread_mutex_unlock(&mutex); + + now= my_getsystime()-now; + ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e7, bad); +} + +int main(int argc, char **argv) +{ + MY_INIT("thd_template"); + + if (argv[1] && *argv[1]) + DBUG_SET_INITIAL(argv[1]); + + pthread_mutex_init(&mutex, 0); + pthread_cond_init(&cond, 0); + pthread_attr_init(&thr_attr); + pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); + +#ifdef MY_ATOMIC_MODE_RWLOCKS +#if defined(HPUX11) || defined(__POWERPC__) /* showed to be very slow (scheduler-related) */ +#define CYCLES 300 +#else +#define CYCLES 3000 +#endif +#else +#define CYCLES 3000 +#endif +#define THREADS 30 + + diag("N CPUs: %d, atomic ops: %s", my_getncpus(), MY_ATOMIC_MODE); + + do_tests(); + + /* + workaround until we know why it crashes randomly on some machine + (BUG#22320). + */ + sleep(2); + pthread_mutex_destroy(&mutex); + pthread_cond_destroy(&cond); + pthread_attr_destroy(&thr_attr); + my_end(0); + return exit_status(); +} + diff --git a/unittest/mysys/waiting_threads-t.c b/unittest/mysys/waiting_threads-t.c new file mode 100644 index 00000000000..80a8f010fd7 --- /dev/null +++ b/unittest/mysys/waiting_threads-t.c @@ -0,0 +1,278 @@ +/* Copyright (C) 2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "thr_template.c" +#include <waiting_threads.h> +#include <m_string.h> +#include <locale.h> + +struct test_wt_thd { + WT_THD thd; + pthread_mutex_t lock; +} thds[THREADS]; + +uint i, cnt; +pthread_mutex_t lock; + +#define reset(ARRAY) bzero(ARRAY, sizeof(ARRAY)) + +enum { LATEST, RANDOM, YOUNGEST, LOCKS } kill_strategy; + +WT_RESOURCE_TYPE restype={ wt_resource_id_memcmp, 0}; + +#define rnd() ((uint)(my_rnd(&rand) * INT_MAX32)) + +/* + stress test: wait on a random number of random threads. + it always succeeds (unless crashes or hangs). +*/ +pthread_handler_t test_wt(void *arg) +{ + int m, n, i, id, res; + struct my_rnd_struct rand; + + my_thread_init(); + + pthread_mutex_lock(&lock); + id= cnt++; + pthread_mutex_unlock(&lock); + + my_rnd_init(&rand, (ulong)(intptr)&m, id); + if (kill_strategy == YOUNGEST) + thds[id].thd.weight= ~my_getsystime(); + if (kill_strategy == LOCKS) + thds[id].thd.weight= 0; + + /* + wt_thd_init() is supposed to be called in the thread that will use it. + We didn't do that, and now need to fix the broken object. + */ + thds[id].thd.pins->stack_ends_here= & my_thread_var->stack_ends_here; +#ifndef DBUG_OFF + thds[id].thd.name=my_thread_name(); +#endif + + for (m= *(int *)arg; m ; m--) + { + WT_RESOURCE_ID resid; + int blockers[THREADS/10], j, k; + bzero(&resid, sizeof(resid)); + + resid.value.num= id; //rnd() % THREADS; + resid.type= &restype; + + res= 0; + + for (j= n= (rnd() % THREADS)/10; !res && j >= 0; j--) + { +retry: + i= rnd() % (THREADS-1); + if (i >= id) i++; + +#ifndef DBUG_OFF + if (thds[i].thd.name==0) + goto retry; +#endif + + for (k=n; k >=j; k--) + if (blockers[k] == i) + goto retry; + blockers[j]= i; + + if (kill_strategy == RANDOM) + thds[id].thd.weight= rnd(); + + pthread_mutex_lock(& thds[i].lock); + res= wt_thd_will_wait_for(& thds[id].thd, & thds[i].thd, &resid); + pthread_mutex_unlock(& thds[i].lock); + } + + if (!res) + { + pthread_mutex_lock(&lock); + res= wt_thd_cond_timedwait(& thds[id].thd, &lock); + pthread_mutex_unlock(&lock); + } + + if (res) + { + pthread_mutex_lock(& thds[id].lock); + pthread_mutex_lock(&lock); + wt_thd_release_all(& thds[id].thd); + pthread_mutex_unlock(&lock); + pthread_mutex_unlock(& thds[id].lock); + if (kill_strategy == LOCKS) + thds[id].thd.weight= 0; + if (kill_strategy == YOUNGEST) + thds[id].thd.weight= ~my_getsystime(); + } + else if (kill_strategy == LOCKS) + thds[id].thd.weight++; + } + + pthread_mutex_lock(& thds[id].lock); + pthread_mutex_lock(&lock); + wt_thd_release_all(& thds[id].thd); + pthread_mutex_unlock(&lock); + pthread_mutex_unlock(& thds[id].lock); + +#ifndef DBUG_OFF + { +#define DEL "(deleted)" + char *x=malloc(strlen(thds[id].thd.name)+sizeof(DEL)+1); + strxmov(x, thds[id].thd.name, DEL, 0); + thds[id].thd.name=x; /* it's a memory leak, go on, shot me */ + } +#endif + + pthread_mutex_lock(&mutex); + if (!--running_threads) pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); + DBUG_PRINT("wt", ("exiting")); + my_thread_end(); + return 0; +} + +void do_one_test() +{ + double sum, sum0; + + +#ifndef DBUG_OFF + for (cnt=0; cnt < THREADS; cnt++) + thds[cnt].thd.name=0; +#endif + + reset(wt_cycle_stats); + reset(wt_wait_stats); + wt_success_stats=0; + cnt=0; + test_concurrently("waiting_threads", test_wt, THREADS, CYCLES); + + sum=sum0=0; + for (cnt=0; cnt < WT_CYCLE_STATS; cnt++) + sum+= wt_cycle_stats[0][cnt] + wt_cycle_stats[1][cnt]; + for (cnt=0; cnt < WT_CYCLE_STATS; cnt++) + if (wt_cycle_stats[0][cnt] + wt_cycle_stats[1][cnt] > 0) + { + sum0+=wt_cycle_stats[0][cnt] + wt_cycle_stats[1][cnt]; + diag("deadlock cycles of length %2u: %4u %4u %8.2f %%", cnt, + wt_cycle_stats[0][cnt], wt_cycle_stats[1][cnt], 1e2*sum0/sum); + } + diag("depth exceeded: %u %u", + wt_cycle_stats[0][cnt], wt_cycle_stats[1][cnt]); + for (cnt=0; cnt < WT_WAIT_STATS; cnt++) + if (wt_wait_stats[cnt]>0) + diag("deadlock waits up to %7llu us: %5u", + wt_wait_table[cnt], wt_wait_stats[cnt]); + diag("timed out: %u", wt_wait_stats[cnt]); + diag("successes: %u", wt_success_stats); +} + +void do_tests() +{ + plan(12); + compile_time_assert(THREADS >= 3); + + DBUG_PRINT("wt", ("================= initialization ===================")); + + bad= my_atomic_initialize(); + ok(!bad, "my_atomic_initialize() returned %d", bad); + + pthread_mutex_init(&lock, 0); + wt_init(); + for (cnt=0; cnt < THREADS; cnt++) + { + wt_thd_init(& thds[cnt].thd); + pthread_mutex_init(& thds[cnt].lock, 0); + } + { + WT_RESOURCE_ID resid[3]; + for (i=0; i < 3; i++) + { + bzero(&resid[i], sizeof(resid[i])); + resid[i].value.num= i+1; + resid[i].type= &restype; + } + + DBUG_PRINT("wt", ("================= manual test ===================")); + +#define ok_wait(X,Y, R) \ + ok(wt_thd_will_wait_for(& thds[X].thd, & thds[Y].thd, &resid[R]) == 0, \ + "thd[" #X "] will wait for thd[" #Y "]") +#define ok_deadlock(X,Y,R) \ + ok(wt_thd_will_wait_for(& thds[X].thd, & thds[Y].thd, &resid[R]) == WT_DEADLOCK, \ + "thd[" #X "] will wait for thd[" #Y "] - deadlock") + + ok_wait(0,1,0); + ok_wait(0,2,0); + ok_wait(0,3,0); + + pthread_mutex_lock(&lock); + bad= wt_thd_cond_timedwait(& thds[0].thd, &lock); + pthread_mutex_unlock(&lock); + ok(bad == ETIMEDOUT, "timeout test returned %d", bad); + + ok_wait(0,1,0); + ok_wait(1,2,1); + ok_deadlock(2,0,2); + + // FIXME remove wt_thd_dontwait calls below + wt_thd_dontwait(& thds[0].thd); + wt_thd_dontwait(& thds[1].thd); + wt_thd_dontwait(& thds[2].thd); + wt_thd_dontwait(& thds[3].thd); + pthread_mutex_lock(&lock); + wt_thd_release_all(& thds[0].thd); + wt_thd_release_all(& thds[1].thd); + wt_thd_release_all(& thds[2].thd); + wt_thd_release_all(& thds[3].thd); + pthread_mutex_unlock(&lock); + } + + wt_deadlock_search_depth_short=6; + wt_timeout_short=1000; + wt_timeout_long= 100; + wt_deadlock_search_depth_long=16; + DBUG_PRINT("wt", ("================= stress test ===================")); + + diag("timeout_short=%d us, deadlock_search_depth_short=%d", + wt_timeout_short, wt_deadlock_search_depth_short); + diag("timeout_long=%d us, deadlock_search_depth_long=%d", + wt_timeout_long, wt_deadlock_search_depth_long); + +#define test_kill_strategy(X) \ + diag("kill strategy: " #X); \ + kill_strategy=X; \ + do_one_test(); + + test_kill_strategy(LATEST); + test_kill_strategy(RANDOM); + test_kill_strategy(YOUNGEST); + test_kill_strategy(LOCKS); + + DBUG_PRINT("wt", ("================= cleanup ===================")); + pthread_mutex_lock(&lock); + for (cnt=0; cnt < THREADS; cnt++) + { + wt_thd_release_all(& thds[cnt].thd); + wt_thd_destroy(& thds[cnt].thd); + pthread_mutex_destroy(& thds[cnt].lock); + } + pthread_mutex_unlock(&lock); + wt_end(); + pthread_mutex_destroy(&lock); +} + |