summaryrefslogtreecommitdiff
path: root/mysys
diff options
context:
space:
mode:
authorSergei Golubchik <serg@mysql.com>2008-07-29 16:10:24 +0200
committerSergei Golubchik <serg@mysql.com>2008-07-29 16:10:24 +0200
commit6ba12f070c65a445ba3f6758c1a49a872c627561 (patch)
tree8fc9687df3d7995af94f6a6df09bc646da26592e /mysys
parent96e2ca52adfc4e58e4a08d20dcb32a6ff2f1ab2c (diff)
downloadmariadb-git-6ba12f070c65a445ba3f6758c1a49a872c627561.tar.gz
WL#3064 - waiting threads - wait-for graph and deadlock detection
client/mysqltest.c: compiler warnings configure.in: remove old tests for unused programs disable the use of gcc built-ins if smp assembler atomics were selected explictily. add waiting_threads.o to THREAD_LOBJECTS include/lf.h: replace the end-of-stack pointer with the pointer to the end-of-stack pointer. the latter could be stored in THD (mysys_vars) and updated in pool-of-threads scheduler. constructor/destructor in lf-alloc include/my_pthread.h: shuffle set_timespec/set_timespec_nsec macros a bit to be able to fill several timeout structures with only one my_getsystime() call include/waiting_threads.h: waiting threads - wait-for graph and deadlock detection mysys/Makefile.am: add waiting_threads.c mysys/lf_alloc-pin.c: replace the end-of-stack pointer with the pointer to the end-of-stack pointer. the latter could be stored in THD (mysys_vars) and updated in pool-of-threads scheduler. constructor/destructor in lf-alloc mysys/lf_hash.c: constructor/destructor in lf-alloc mysys/my_thr_init.c: remember end-of-stack pointer in the mysys_var mysys/waiting_threads.c: waiting threads - wait-for graph and deadlock detection storage/maria/ha_maria.cc: replace the end-of-stack pointer with the pointer to the end-of-stack pointer. the latter could be stored in THD (mysys_vars) and updated in pool-of-threads scheduler. storage/maria/ma_commit.c: replace the end-of-stack pointer with the pointer to the end-of-stack pointer. the latter could be stored in THD (mysys_vars) and updated in pool-of-threads scheduler. storage/maria/trnman.c: replace the end-of-stack pointer with the pointer to the end-of-stack pointer. the latter could be stored in THD (mysys_vars) and updated in pool-of-threads scheduler. storage/maria/trnman_public.h: replace the end-of-stack pointer with the pointer to the end-of-stack pointer. the latter could be stored in THD (mysys_vars) and updated in pool-of-threads scheduler. storage/maria/unittest/trnman-t.c: replace the end-of-stack pointer with the pointer to the end-of-stack pointer. the latter could be stored in THD (mysys_vars) and updated in pool-of-threads scheduler. unittest/mysys/Makefile.am: add waiting_threads-t unittest/mysys/lf-t.c: factor out the common code for multi-threaded stress unit tests move lf tests to a separate file unittest/mysys/my_atomic-t.c: factor out the common code for multi-threaded stress unit tests move lf tests to a separate file unittest/mysys/thr_template.c: factor out the common code for multi-threaded stress unit tests unittest/mysys/waiting_threads-t.c: wt tests
Diffstat (limited to 'mysys')
-rw-r--r--mysys/Makefile.am2
-rw-r--r--mysys/lf_alloc-pin.c52
-rw-r--r--mysys/lf_hash.c15
-rw-r--r--mysys/my_thr_init.c8
-rw-r--r--mysys/waiting_threads.c641
5 files changed, 688 insertions, 30 deletions
diff --git a/mysys/Makefile.am b/mysys/Makefile.am
index 7bb98770d06..54553680341 100644
--- a/mysys/Makefile.am
+++ b/mysys/Makefile.am
@@ -58,7 +58,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c my_mmap.c \
my_windac.c my_access.c base64.c my_libwrap.c \
wqueue.c
EXTRA_DIST = thr_alarm.c thr_lock.c my_pthread.c my_thr_init.c \
- thr_mutex.c thr_rwlock.c \
+ thr_mutex.c thr_rwlock.c waiting_threads.c \
CMakeLists.txt mf_soundex.c \
my_conio.c my_wincond.c my_winthread.c
libmysys_a_LIBADD = @THREAD_LOBJECTS@
diff --git a/mysys/lf_alloc-pin.c b/mysys/lf_alloc-pin.c
index 40438e93596..4fae8e37ddb 100644
--- a/mysys/lf_alloc-pin.c
+++ b/mysys/lf_alloc-pin.c
@@ -96,11 +96,10 @@
versioning a pointer - because we use an array, a pointer to pins is 16 bit,
upper 16 bits are used for a version.
- It is assumed that pins belong to a thread and are not transferable
- between threads (LF_PINS::stack_ends_here being a primary reason
+ It is assumed that pins belong to a THD and are not transferable
+ between THD's (LF_PINS::stack_ends_here being a primary reason
for this limitation).
*/
-
#include <my_global.h>
#include <my_sys.h>
#include <lf.h>
@@ -137,10 +136,6 @@ void lf_pinbox_destroy(LF_PINBOX *pinbox)
SYNOPSYS
pinbox -
- stack_end - a pointer to the end (top/bottom, depending on the
- STACK_DIRECTION) of stack. Used for safe alloca. There's
- no safety margin deducted, a caller should take care of it,
- if necessary.
DESCRIPTION
get a new LF_PINS structure from a stack of unused pins,
@@ -150,7 +145,7 @@ void lf_pinbox_destroy(LF_PINBOX *pinbox)
It is assumed that pins belong to a thread and are not transferable
between threads.
*/
-LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end)
+LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox)
{
uint32 pins, next, top_ver;
LF_PINS *el;
@@ -194,7 +189,7 @@ LF_PINS *_lf_pinbox_get_pins(LF_PINBOX *pinbox, void *stack_end)
el->link= pins;
el->purgatory_count= 0;
el->pinbox= pinbox;
- el->stack_ends_here= stack_end;
+ el->stack_ends_here= & my_thread_var->stack_ends_here;
return el;
}
@@ -325,6 +320,9 @@ static int match_pins(LF_PINS *el, void *addr)
#define available_stack_size(CUR,END) (long) ((char*)(END) - (char*)(CUR))
#endif
+#define next_node(P, X) (*((uchar **)(((uchar *)(X)) + (P)->free_ptr_offset)))
+#define anext_node(X) next_node(&allocator->pinbox, (X))
+
/*
Scan the purgatory and free everything that can be freed
*/
@@ -332,7 +330,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
{
int npins, alloca_size;
void *list, **addr;
- struct st_lf_alloc_node *first, *last= NULL;
+ uchar *first, *last= NULL;
LF_PINBOX *pinbox= pins->pinbox;
LINT_INIT(first);
@@ -341,7 +339,7 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
#ifdef HAVE_ALLOCA
alloca_size= sizeof(void *)*LF_PINBOX_PINS*npins;
/* create a sorted list of pinned addresses, to speed up searches */
- if (available_stack_size(&pinbox, pins->stack_ends_here) > alloca_size)
+ if (available_stack_size(&pinbox, *pins->stack_ends_here) > alloca_size)
{
struct st_harvester hv;
addr= (void **) alloca(alloca_size);
@@ -391,9 +389,9 @@ static void _lf_pinbox_real_free(LF_PINS *pins)
}
/* not pinned - freeing */
if (last)
- last= last->next= (struct st_lf_alloc_node *)cur;
+ last= next_node(pinbox, last)= (uchar *)cur;
else
- first= last= (struct st_lf_alloc_node *)cur;
+ first= last= (uchar *)cur;
continue;
found:
/* pinned - keeping */
@@ -412,22 +410,22 @@ LF_REQUIRE_PINS(1)
add it back to the allocator stack
DESCRIPTION
- 'first' and 'last' are the ends of the linked list of st_lf_alloc_node's:
+ 'first' and 'last' are the ends of the linked list of nodes:
first->el->el->....->el->last. Use first==last to free only one element.
*/
-static void alloc_free(struct st_lf_alloc_node *first,
- struct st_lf_alloc_node volatile *last,
+static void alloc_free(uchar *first,
+ uchar volatile *last,
LF_ALLOCATOR *allocator)
{
/*
we need a union here to access type-punned pointer reliably.
otherwise gcc -fstrict-aliasing will not see 'tmp' changed in the loop
*/
- union { struct st_lf_alloc_node * node; void *ptr; } tmp;
+ union { uchar * node; void *ptr; } tmp;
tmp.node= allocator->top;
do
{
- last->next= tmp.node;
+ anext_node(last)= tmp.node;
} while (!my_atomic_casptr((void **)(char *)&allocator->top,
(void **)&tmp.ptr, first) && LF_BACKOFF);
}
@@ -452,6 +450,8 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
allocator->top= 0;
allocator->mallocs= 0;
allocator->element_size= size;
+ allocator->constructor= 0;
+ allocator->destructor= 0;
DBUG_ASSERT(size >= sizeof(void*) + free_ptr_offset);
}
@@ -468,10 +468,12 @@ void lf_alloc_init(LF_ALLOCATOR *allocator, uint size, uint free_ptr_offset)
*/
void lf_alloc_destroy(LF_ALLOCATOR *allocator)
{
- struct st_lf_alloc_node *node= allocator->top;
+ uchar *node= allocator->top;
while (node)
{
- struct st_lf_alloc_node *tmp= node->next;
+ uchar *tmp= anext_node(node);
+ if (allocator->destructor)
+ allocator->destructor(node);
my_free((void *)node, MYF(0));
node= tmp;
}
@@ -489,7 +491,7 @@ void lf_alloc_destroy(LF_ALLOCATOR *allocator)
void *_lf_alloc_new(LF_PINS *pins)
{
LF_ALLOCATOR *allocator= (LF_ALLOCATOR *)(pins->pinbox->free_func_arg);
- struct st_lf_alloc_node *node;
+ uchar *node;
for (;;)
{
do
@@ -500,6 +502,8 @@ void *_lf_alloc_new(LF_PINS *pins)
if (!node)
{
node= (void *)my_malloc(allocator->element_size, MYF(MY_WME));
+ if (allocator->constructor)
+ allocator->constructor(node);
#ifdef MY_LF_EXTRA_DEBUG
if (likely(node != 0))
my_atomic_add32(&allocator->mallocs, 1);
@@ -507,7 +511,7 @@ void *_lf_alloc_new(LF_PINS *pins)
break;
}
if (my_atomic_casptr((void **)(char *)&allocator->top,
- (void *)&node, node->next))
+ (void *)&node, anext_node(node)))
break;
}
_lf_unpin(pins, 0);
@@ -523,8 +527,8 @@ void *_lf_alloc_new(LF_PINS *pins)
uint lf_alloc_pool_count(LF_ALLOCATOR *allocator)
{
uint i;
- struct st_lf_alloc_node *node;
- for (node= allocator->top, i= 0; node; node= node->next, i++)
+ uchar *node;
+ for (node= allocator->top, i= 0; node; node= anext_node(node), i++)
/* no op */;
return i;
}
diff --git a/mysys/lf_hash.c b/mysys/lf_hash.c
index c197cc99711..008abef0c8b 100644
--- a/mysys/lf_hash.c
+++ b/mysys/lf_hash.c
@@ -299,11 +299,22 @@ static int initialize_bucket(LF_HASH *, LF_SLIST * volatile*, uint, LF_PINS *);
/*
Initializes lf_hash, the arguments are compatible with hash_init
+
+ @@note element_size sets both the size of allocated memory block for
+ lf_alloc and a size of memcpy'ed block size in lf_hash_insert. Typically
+ they are the same, indeed. But LF_HASH::element_size can be decreased
+ after lf_hash_init, and then lf_alloc will allocate larger block that
+ lf_hash_insert will copy over. It is desireable if part of the element
+ is expensive to initialize - for example if there is a mutex or
+ DYNAMIC_ARRAY. In this case they should be initialize in the
+ LF_ALLOCATOR::constructor, and lf_hash_insert should not overwrite them.
+ See wt_init() for example.
*/
void lf_hash_init(LF_HASH *hash, uint element_size, uint flags,
uint key_offset, uint key_length, hash_get_key get_key,
CHARSET_INFO *charset)
{
+ compile_time_assert(sizeof(LF_SLIST) == LF_HASH_OVERHEAD);
lf_alloc_init(&hash->alloc, sizeof(LF_SLIST)+element_size,
offsetof(LF_SLIST, key));
lf_dynarray_init(&hash->array, sizeof(LF_SLIST *));
@@ -453,7 +464,7 @@ void *lf_hash_search(LF_HASH *hash, LF_PINS *pins, const void *key, uint keylen)
return found ? found+1 : 0;
}
-static const uchar *dummy_key= "";
+static const uchar *dummy_key= (uchar*)"";
/*
RETURN
@@ -473,7 +484,7 @@ static int initialize_bucket(LF_HASH *hash, LF_SLIST * volatile *node,
unlikely(initialize_bucket(hash, el, parent, pins)))
return -1;
dummy->hashnr= my_reverse_bits(bucket) | 0; /* dummy node */
- dummy->key= (char*) dummy_key;
+ dummy->key= dummy_key;
dummy->keylen= 0;
if ((cur= linsert(el, hash->charset, dummy, pins, LF_HASH_UNIQUE)))
{
diff --git a/mysys/my_thr_init.c b/mysys/my_thr_init.c
index f5fee06916e..1d03577ce34 100644
--- a/mysys/my_thr_init.c
+++ b/mysys/my_thr_init.c
@@ -256,7 +256,7 @@ my_bool my_thread_init(void)
#ifdef EXTRA_DEBUG_THREADS
fprintf(stderr,"my_thread_init(): thread_id: 0x%lx\n",
(ulong) pthread_self());
-#endif
+#endif
#if !defined(__WIN__) || defined(USE_TLS)
if (my_pthread_getspecific(struct st_my_thread_var *,THR_KEY_mysys))
@@ -264,7 +264,7 @@ my_bool my_thread_init(void)
#ifdef EXTRA_DEBUG_THREADS
fprintf(stderr,"my_thread_init() called more than once in thread 0x%lx\n",
(long) pthread_self());
-#endif
+#endif
goto end;
}
if (!(tmp= (struct st_my_thread_var *) calloc(1, sizeof(*tmp))))
@@ -290,6 +290,8 @@ my_bool my_thread_init(void)
pthread_mutex_init(&tmp->mutex,MY_MUTEX_INIT_FAST);
pthread_cond_init(&tmp->suspend, NULL);
+ tmp->stack_ends_here= &tmp + STACK_DIRECTION * my_thread_stack_size;
+
pthread_mutex_lock(&THR_LOCK_threads);
tmp->id= ++thread_id;
++THR_thread_count;
@@ -325,7 +327,7 @@ void my_thread_end(void)
#ifdef EXTRA_DEBUG_THREADS
fprintf(stderr,"my_thread_end(): tmp: 0x%lx pthread_self: 0x%lx thread_id: %ld\n",
(long) tmp, (long) pthread_self(), tmp ? (long) tmp->id : 0L);
-#endif
+#endif
if (tmp && tmp->init)
{
#if !defined(DBUG_OFF)
diff --git a/mysys/waiting_threads.c b/mysys/waiting_threads.c
new file mode 100644
index 00000000000..4d375fdc899
--- /dev/null
+++ b/mysys/waiting_threads.c
@@ -0,0 +1,641 @@
+/* Copyright (C) 2008 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/*
+ Note that if your lock system satisfy the following condition:
+
+ there exist four lock levels A, B, C, D, such as
+ A is compatible with B
+ A is not compatible with C
+ D is not compatible with B
+
+ (example A=IX, B=IS, C=S, D=X)
+
+ you need to include lock level in the resource identifier - thread 1
+ waiting for lock A on resource R and thread 2 waiting for lock B
+ on resource R should wait on different WT_RESOURCE structures, on different
+ {lock, resource} pairs. Otherwise the following is possible:
+
+ thread1> take S-lock on R
+ thread2> take IS-lock on R
+ thread3> wants X-lock on R, starts waiting for threads 1 and 2 on R.
+ thread3 is killed (or timeout or whatever)
+ WT_RESOURCE structure for R is still in the hash, as it has two owners
+ thread4> wants an IX-lock on R
+ WT_RESOURCE for R is found in the hash, thread4 starts waiting on it.
+ !! now thread4 is waiting for both thread1 and thread2
+ !! while, in fact, IX-lock and IS-lock are compatible and
+ !! thread4 should not wait for thread2.
+*/
+
+#include <waiting_threads.h>
+#include <m_string.h>
+
+uint wt_timeout_short=100, wt_deadlock_search_depth_short=4;
+uint wt_timeout_long=10000, wt_deadlock_search_depth_long=15;
+
+/*
+ status variables:
+ distribution of cycle lengths
+ wait time log distribution
+
+ Note:
+
+ we call deadlock() twice per wait (with different search lengths).
+ it means a deadlock will be counted twice. It's difficult to avoid,
+ as on the second search we could find a *different* deadlock and we
+ *want* to count it too. So we just count all deadlocks - two searches
+ mean two increments on the wt_cycle_stats.
+*/
+
+ulonglong wt_wait_table[WT_WAIT_STATS];
+uint32 wt_wait_stats[WT_WAIT_STATS+1];
+uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats;
+
+static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock;
+
+#define increment_success_stats() \
+ do { \
+ my_atomic_rwlock_wrlock(&success_stats_lock); \
+ my_atomic_add32(&wt_success_stats, 1); \
+ my_atomic_rwlock_wrunlock(&success_stats_lock); \
+ } while (0)
+
+#define increment_cycle_stats(X,MAX) \
+ do { \
+ uint i= (X), j= (MAX) == wt_deadlock_search_depth_long; \
+ if (i >= WT_CYCLE_STATS) \
+ i= WT_CYCLE_STATS; \
+ my_atomic_rwlock_wrlock(&cycle_stats_lock); \
+ my_atomic_add32(&wt_cycle_stats[j][i], 1); \
+ my_atomic_rwlock_wrunlock(&cycle_stats_lock); \
+ } while (0)
+
+#define increment_wait_stats(X,RET) \
+ do { \
+ uint i; \
+ if ((RET) == ETIMEDOUT) \
+ i= WT_WAIT_STATS; \
+ else \
+ { \
+ ulonglong w=(X)/10; \
+ for (i=0; i < WT_WAIT_STATS && w > wt_wait_table[i]; i++) ; \
+ } \
+ my_atomic_rwlock_wrlock(&wait_stats_lock); \
+ my_atomic_add32(wt_wait_stats+i, 1); \
+ my_atomic_rwlock_wrunlock(&wait_stats_lock); \
+ } while (0)
+
+#define rc_rdlock(X) \
+ do { \
+ WT_RESOURCE *R=(X); \
+ DBUG_PRINT("wt", ("LOCK resid=%lld for READ", R->id.value.num)); \
+ pthread_rwlock_rdlock(&R->lock); \
+ } while (0)
+#define rc_wrlock(X) \
+ do { \
+ WT_RESOURCE *R=(X); \
+ DBUG_PRINT("wt", ("LOCK resid=%lld for WRITE", R->id.value.num)); \
+ pthread_rwlock_wrlock(&R->lock); \
+ } while (0)
+#define rc_unlock(X) \
+ do { \
+ WT_RESOURCE *R=(X); \
+ DBUG_PRINT("wt", ("UNLOCK resid=%lld", R->id.value.num)); \
+ pthread_rwlock_unlock(&R->lock); \
+ } while (0)
+
+static LF_HASH reshash;
+
+static void wt_resource_init(uchar *arg)
+{
+ WT_RESOURCE *rc=(WT_RESOURCE*)(arg+LF_HASH_OVERHEAD);
+ DBUG_ENTER("wt_resource_init");
+
+ bzero(rc, sizeof(*rc));
+ pthread_rwlock_init(&rc->lock, 0);
+ pthread_cond_init(&rc->cond, 0);
+ my_init_dynamic_array(&rc->owners, sizeof(WT_THD *), 5, 5);
+ DBUG_VOID_RETURN;
+}
+
+static void wt_resource_destroy(uchar *arg)
+{
+ WT_RESOURCE *rc=(WT_RESOURCE*)(arg+LF_HASH_OVERHEAD);
+ DBUG_ENTER("wt_resource_destroy");
+
+ DBUG_ASSERT(rc->owners.elements == 0);
+ pthread_rwlock_destroy(&rc->lock);
+ pthread_cond_destroy(&rc->cond);
+ delete_dynamic(&rc->owners);
+ DBUG_VOID_RETURN;
+}
+
+void wt_init()
+{
+ DBUG_ENTER("wt_init");
+
+ lf_hash_init(&reshash, sizeof(WT_RESOURCE), LF_HASH_UNIQUE, 0,
+ sizeof(struct st_wt_resource_id), 0, 0);
+ reshash.alloc.constructor= wt_resource_init;
+ reshash.alloc.destructor= wt_resource_destroy;
+ /*
+ Note a trick: we initialize the hash with the real element size,
+ but fix it later to a shortened element size. This way
+ the allocator will allocate elements correctly, but
+ lf_hash_insert() will only overwrite part of the element with memcpy().
+ lock, condition, and dynamic array will be intact.
+ */
+ reshash.element_size= offsetof(WT_RESOURCE, lock);
+ bzero(wt_wait_stats, sizeof(wt_wait_stats));
+ bzero(wt_cycle_stats, sizeof(wt_cycle_stats));
+ wt_success_stats=0;
+ {
+ int i;
+ double from=log(1); /* 1 us */
+ double to=log(60e6); /* 1 min */
+ for (i=0; i < WT_WAIT_STATS; i++)
+ {
+ wt_wait_table[i]=(ulonglong)exp((to-from)/(WT_WAIT_STATS-1)*i+from);
+ DBUG_ASSERT(i==0 || wt_wait_table[i-1] != wt_wait_table[i]);
+ }
+ }
+ my_atomic_rwlock_init(&cycle_stats_lock);
+ my_atomic_rwlock_init(&success_stats_lock);
+ my_atomic_rwlock_init(&wait_stats_lock);
+ DBUG_VOID_RETURN;
+}
+
+void wt_end()
+{
+ DBUG_ENTER("wt_end");
+
+ DBUG_ASSERT(reshash.count == 0);
+ lf_hash_destroy(&reshash);
+ my_atomic_rwlock_destroy(&cycle_stats_lock);
+ my_atomic_rwlock_destroy(&success_stats_lock);
+ my_atomic_rwlock_destroy(&wait_stats_lock);
+ DBUG_VOID_RETURN;
+}
+
+void wt_thd_init(WT_THD *thd)
+{
+ DBUG_ENTER("wt_thd_init");
+
+ my_init_dynamic_array(&thd->my_resources, sizeof(WT_RESOURCE *), 10, 5);
+ thd->pins=lf_hash_get_pins(&reshash);
+ thd->waiting_for=0;
+ thd->weight=0;
+#ifndef DBUG_OFF
+ thd->name=my_thread_name();
+#endif
+ DBUG_VOID_RETURN;
+}
+
+void wt_thd_destroy(WT_THD *thd)
+{
+ DBUG_ENTER("wt_thd_destroy");
+
+ DBUG_ASSERT(thd->my_resources.elements == 0);
+ delete_dynamic(&thd->my_resources);
+ lf_hash_put_pins(thd->pins);
+ thd->waiting_for=0;
+ DBUG_VOID_RETURN;
+}
+
+int wt_resource_id_memcmp(void *a, void *b)
+{
+ return memcmp(a, b, sizeof(WT_RESOURCE_ID));
+}
+
+struct deadlock_arg {
+ WT_THD *thd;
+ uint max_depth;
+ WT_THD *victim;
+ WT_RESOURCE *rc;
+};
+
+/*
+ loop detection in a wait-for graph with a limited search depth.
+*/
+static int deadlock_search(struct deadlock_arg *arg, WT_THD *blocker,
+ uint depth)
+{
+ WT_RESOURCE *rc, *volatile *shared_ptr= &blocker->waiting_for;
+ WT_THD *cursor;
+ uint i;
+ int ret= WT_OK;
+ DBUG_ENTER("deadlock_search");
+ DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, depth=%u",
+ arg->thd->name, blocker->name, depth));
+
+ LF_REQUIRE_PINS(1);
+
+ arg->rc= 0;
+
+ if (depth > arg->max_depth)
+ {
+ DBUG_PRINT("wt", ("exit: WT_DEPTH_EXCEEDED (early)"));
+ DBUG_RETURN(WT_DEPTH_EXCEEDED);
+ }
+
+retry:
+ /* safe dereference as explained in lf_alloc-pin.c */
+ do
+ {
+ rc= *shared_ptr;
+ lf_pin(arg->thd->pins, 0, rc);
+ } while (rc != *shared_ptr && LF_BACKOFF);
+
+ if (rc == 0)
+ {
+ DBUG_PRINT("wt", ("exit: OK (early)"));
+ DBUG_RETURN(0);
+ }
+
+ rc_rdlock(rc);
+ if (rc->state != ACTIVE || *shared_ptr != rc)
+ {
+ rc_unlock(rc);
+ lf_unpin(arg->thd->pins, 0);
+ goto retry;
+ }
+ lf_unpin(arg->thd->pins, 0);
+
+ for (i=0; i < rc->owners.elements; i++)
+ {
+ cursor= *dynamic_element(&rc->owners, i, WT_THD**);
+ if (cursor == arg->thd)
+ {
+ ret= WT_DEADLOCK;
+ increment_cycle_stats(depth, arg->max_depth);
+ arg->victim= cursor;
+ goto end;
+ }
+ }
+ for (i=0; i < rc->owners.elements; i++)
+ {
+ cursor= *dynamic_element(&rc->owners, i, WT_THD**);
+ switch (deadlock_search(arg, cursor, depth+1)) {
+ case WT_DEPTH_EXCEEDED:
+ ret= WT_DEPTH_EXCEEDED;
+ break;
+ case WT_DEADLOCK:
+ ret= WT_DEADLOCK;
+ if (cursor->weight < arg->victim->weight)
+ {
+ if (arg->victim != arg->thd)
+ {
+ rc_unlock(arg->victim->waiting_for); /* release the previous victim */
+ DBUG_ASSERT(arg->rc == cursor->waiting_for);
+ }
+ arg->victim= cursor;
+ }
+ else if (arg->rc)
+ rc_unlock(arg->rc);
+ goto end;
+ case WT_OK:
+ break;
+ default:
+ DBUG_ASSERT(0);
+ }
+ if (arg->rc)
+ rc_unlock(arg->rc);
+ }
+end:
+ arg->rc= rc;
+ DBUG_PRINT("wt", ("exit: %s",
+ ret == WT_DEPTH_EXCEEDED ? "WT_DEPTH_EXCEEDED" :
+ ret ? "WT_DEADLOCK" : "OK"));
+ DBUG_RETURN(ret);
+}
+
+static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
+ uint max_depth)
+{
+ struct deadlock_arg arg= {thd, max_depth, 0, 0};
+ int ret;
+ DBUG_ENTER("deadlock");
+ ret= deadlock_search(&arg, blocker, depth);
+ if (arg.rc)
+ rc_unlock(arg.rc);
+ if (ret == WT_DEPTH_EXCEEDED)
+ {
+ increment_cycle_stats(WT_CYCLE_STATS, max_depth);
+ ret= WT_OK;
+ }
+ if (ret == WT_DEADLOCK && arg.victim != thd)
+ {
+ DBUG_PRINT("wt", ("killing %s", arg.victim->name));
+ arg.victim->killed=1;
+ pthread_cond_broadcast(&arg.victim->waiting_for->cond);
+ rc_unlock(arg.victim->waiting_for);
+ ret= WT_OK;
+ }
+ DBUG_RETURN(ret);
+}
+
+
+/*
+ Deletes an element from reshash.
+ rc->lock must be locked by the caller and it's unlocked on return.
+*/
+static void unlock_lock_and_free_resource(WT_THD *thd, WT_RESOURCE *rc)
+{
+ uint keylen;
+ const void *key;
+ DBUG_ENTER("unlock_lock_and_free_resource");
+
+ DBUG_ASSERT(rc->state == ACTIVE);
+
+ if (rc->owners.elements || rc->waiter_count)
+ {
+ DBUG_PRINT("wt", ("nothing to do, %d owners, %d waiters",
+ rc->owners.elements, rc->waiter_count));
+ rc_unlock(rc);
+ DBUG_VOID_RETURN;
+ }
+
+ /* XXX if (rc->id.type->make_key) key= rc->id.type->make_key(&rc->id, &keylen); else */
+ {
+ key= &rc->id;
+ keylen= sizeof(rc->id);
+ }
+
+ /*
+ To free the element correctly we need to:
+ 1. take its lock (already done).
+ 2. set the state to FREE
+ 3. release the lock
+ 4. remove from the hash
+
+ I *think* it's safe to release the lock while the element is still
+ in the hash. If not, the corrected procedure should be
+ 3. pin; 4; remove; 5; release; 6; unpin and it'll need pin[3].
+ */
+ rc->state=FREE;
+ rc_unlock(rc);
+ lf_hash_delete(&reshash, thd->pins, key, keylen);
+ DBUG_VOID_RETURN;
+}
+
+
+int wt_thd_dontwait_locked(WT_THD *thd)
+{
+ WT_RESOURCE *rc= thd->waiting_for;
+ DBUG_ENTER("wt_thd_dontwait_locked");
+
+ DBUG_ASSERT(rc->waiter_count);
+ DBUG_ASSERT(rc->state == ACTIVE);
+ rc->waiter_count--;
+ thd->waiting_for= 0;
+ unlock_lock_and_free_resource(thd, rc);
+ DBUG_RETURN(thd->killed ? WT_DEADLOCK : WT_OK);
+}
+
+int wt_thd_dontwait(WT_THD *thd)
+{
+ int ret;
+ WT_RESOURCE *rc= thd->waiting_for;
+ DBUG_ENTER("wt_thd_dontwait");
+
+ if (!rc)
+ DBUG_RETURN(WT_OK);
+ /*
+ nobody's trying to free the resource now,
+ as its waiter_count is guaranteed to be non-zero
+ */
+ rc_wrlock(rc);
+ ret= wt_thd_dontwait_locked(thd);
+ DBUG_RETURN(ret);
+}
+
+/*
+ called by a *waiter* to declare what resource it will wait for.
+ can be called many times, if many blockers own a blocking resource.
+ but must always be called with the same resource id - a thread cannot
+ wait for more than one resource at a time.
+*/
+int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
+{
+ uint i;
+ WT_RESOURCE *rc;
+ DBUG_ENTER("wt_thd_will_wait_for");
+
+ LF_REQUIRE_PINS(3);
+
+ DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%llu",
+ thd->name, blocker->name, resid->value.num));
+
+ if (thd->waiting_for == 0)
+ {
+ uint keylen;
+ const void *key;
+ /* XXX if (restype->make_key) key= restype->make_key(resid, &keylen); else */
+ {
+ key= resid;
+ keylen= sizeof(*resid);
+ }
+
+ DBUG_PRINT("wt", ("first blocker"));
+
+retry:
+ while ((rc= lf_hash_search(&reshash, thd->pins, key, keylen)) == 0)
+ {
+ WT_RESOURCE tmp;
+
+ DBUG_PRINT("wt", ("failed to find rc in hash, inserting"));
+ bzero(&tmp, sizeof(tmp));
+ tmp.waiter_count= 0;
+ tmp.id= *resid;
+ tmp.state= ACTIVE;
+#ifndef DBUG_OFF
+ tmp.mutex= 0;
+#endif
+
+ lf_hash_insert(&reshash, thd->pins, &tmp);
+ /*
+ Two cases: either lf_hash_insert() failed - because another thread
+ has just inserted a resource with the same id - and we need to retry.
+ Or lf_hash_insert() succeeded, and then we need to repeat
+ lf_hash_search() to find a real address of the newly inserted element.
+ That is, we don't care what lf_hash_insert() has returned.
+ And we need to repeat the loop anyway.
+ */
+ }
+ DBUG_PRINT("wt", ("found in hash rc=%p", rc));
+
+ rc_wrlock(rc);
+ if (rc->state != ACTIVE)
+ {
+ DBUG_PRINT("wt", ("but it's not active, retrying"));
+ /* Somebody has freed the element while we weren't looking */
+ rc_unlock(rc);
+ lf_hash_search_unpin(thd->pins);
+ goto retry;
+ }
+
+ lf_hash_search_unpin(thd->pins); /* the element cannot go away anymore */
+ thd->waiting_for= rc;
+ rc->waiter_count++;
+ thd->killed= 0;
+
+ }
+ else
+ {
+ DBUG_ASSERT(thd->waiting_for->id.type == resid->type);
+ DBUG_ASSERT(resid->type->compare(&thd->waiting_for->id, resid) == 0);
+ DBUG_PRINT("wt", ("adding another blocker"));
+
+ /*
+ we can safely access the resource here, it's in the hash as it has
+ at least one owner, and non-zero waiter_count
+ */
+ rc= thd->waiting_for;
+ rc_wrlock(rc);
+ DBUG_ASSERT(rc->waiter_count);
+ DBUG_ASSERT(rc->state == ACTIVE);
+
+ if (thd->killed)
+ {
+ wt_thd_dontwait_locked(thd);
+ DBUG_RETURN(WT_DEADLOCK);
+ }
+ }
+ for (i=0; i < rc->owners.elements; i++)
+ if (*dynamic_element(&rc->owners, i, WT_THD**) == blocker)
+ break;
+ if (i >= rc->owners.elements)
+ {
+ push_dynamic(&blocker->my_resources, (void*)&rc);
+ push_dynamic(&rc->owners, (void*)&blocker);
+ }
+ rc_unlock(rc);
+
+ if (deadlock(thd, blocker, 1, wt_deadlock_search_depth_short))
+ {
+ wt_thd_dontwait(thd);
+ DBUG_RETURN(WT_DEADLOCK);
+ }
+ DBUG_RETURN(0);
+}
+
+/*
+ called by a *waiter* to start waiting
+
+ It's supposed to be a drop-in replacement for
+ pthread_cond_timedwait(), and it takes mutex as an argument.
+*/
+int wt_thd_cond_timedwait(WT_THD *thd, pthread_mutex_t *mutex)
+{
+ int ret= WT_OK;
+ struct timespec timeout;
+ ulonglong before, after, starttime;
+ WT_RESOURCE *rc= thd->waiting_for;
+ DBUG_ENTER("wt_thd_cond_timedwait");
+ DBUG_PRINT("wt", ("enter: thd=%s, rc=%p", thd->name, rc));
+
+#ifndef DBUG_OFF
+ if (rc->mutex)
+ DBUG_ASSERT(rc->mutex == mutex);
+ else
+ rc->mutex= mutex;
+ safe_mutex_assert_owner(mutex);
+#endif
+
+ before= starttime= my_getsystime();
+
+#ifdef __WIN__
+ /*
+ only for the sake of Windows we distinguish between
+ 'before' and 'starttime'
+ */
+ GetSystemTimeAsFileTime((PFILETIME)&starttime);
+#endif
+
+ set_timespec_time_nsec(timeout, starttime, wt_timeout_short*1000);
+ if (!thd->killed)
+ ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
+ if (ret == WT_TIMEOUT)
+ {
+ if (deadlock(thd, thd, 0, wt_deadlock_search_depth_long))
+ ret= WT_DEADLOCK;
+ else if (wt_timeout_long > wt_timeout_short)
+ {
+ set_timespec_time_nsec(timeout, starttime, wt_timeout_long*1000);
+ if (!thd->killed)
+ ret= pthread_cond_timedwait(&rc->cond, mutex, &timeout);
+ }
+ }
+ after= my_getsystime();
+ if (wt_thd_dontwait(thd) == WT_DEADLOCK)
+ ret= WT_DEADLOCK;
+ increment_wait_stats(after-before, ret);
+ if (ret == WT_OK)
+ increment_success_stats();
+ DBUG_RETURN(ret);
+}
+
+/*
+ called by a *blocker* when it releases a resource
+
+ when resid==0 all resources will be freed
+
+ Note: it's conceptually similar to pthread_cond_broadcast, and must be done
+ under the same mutex as wt_thd_cond_timedwait().
+*/
+void wt_thd_release(WT_THD *thd, WT_RESOURCE_ID *resid)
+{
+ WT_RESOURCE *rc;
+ uint i, j;
+ DBUG_ENTER("wt_thd_release");
+
+ for (i=0; i < thd->my_resources.elements; i++)
+ {
+ rc= *dynamic_element(&thd->my_resources, i, WT_RESOURCE**);
+ if (!resid || (resid->type->compare(&rc->id, resid) == 0))
+ {
+ rc_wrlock(rc);
+ /*
+ nobody's trying to free the resource now,
+ as its owners[] array is not empty (at least thd must be there)
+ */
+ DBUG_ASSERT(rc->state == ACTIVE);
+ for (j=0; j < rc->owners.elements; j++)
+ if (*dynamic_element(&rc->owners, j, WT_THD**) == thd)
+ break;
+ DBUG_ASSERT(j < rc->owners.elements);
+ delete_dynamic_element(&rc->owners, j);
+ if (rc->owners.elements == 0)
+ {
+ pthread_cond_broadcast(&rc->cond);
+#ifndef DBUG_OFF
+ if (rc->mutex)
+ safe_mutex_assert_owner(rc->mutex);
+#endif
+ }
+ unlock_lock_and_free_resource(thd, rc);
+ if (resid)
+ {
+ delete_dynamic_element(&thd->my_resources, i);
+ DBUG_VOID_RETURN;
+ }
+ }
+ }
+ DBUG_ASSERT(!resid);
+ reset_dynamic(&thd->my_resources);
+ DBUG_VOID_RETURN;
+}
+