diff options
author | Sergei Golubchik <serg@mysql.com> | 2008-10-24 12:34:08 +0200 |
---|---|---|
committer | Sergei Golubchik <serg@mysql.com> | 2008-10-24 12:34:08 +0200 |
commit | 14c146618707540c46e1ab1c8b8f103913e1237a (patch) | |
tree | 3d3e78ed586e4e58b2171a1e72ab81c51eec99d1 | |
parent | 9fb894540ed937e1caf8109f356219c103a2c9d1 (diff) | |
download | mariadb-git-14c146618707540c46e1ab1c8b8f103913e1237a.tar.gz |
wt needs to use its own implementation of rwlocks with
reader preference, at least where system rwlocks are fair.
include/my_global.h:
wt uses mutex-based rwlock implementation unless on linux
include/waiting_threads.h:
mutex-based rwlock implementation with reader preference
mysys/thr_rwlock.c:
revert the change. make my_rw_locks fair
mysys/waiting_threads.c:
mutex-based rwlock implementation with reader preference.
convert complex multi-line macros to static functions
-rw-r--r-- | include/my_global.h | 9 | ||||
-rw-r--r-- | include/waiting_threads.h | 21 | ||||
-rw-r--r-- | mysys/thr_rwlock.c | 4 | ||||
-rw-r--r-- | mysys/waiting_threads.c | 160 |
4 files changed, 134 insertions, 60 deletions
diff --git a/include/my_global.h b/include/my_global.h index f2d5a0e5013..d3f99c9147c 100644 --- a/include/my_global.h +++ b/include/my_global.h @@ -1524,6 +1524,15 @@ inline void operator delete[](void*, void*) { /* Do nothing */ } */ #ifdef TARGET_OS_LINUX #define NEED_EXPLICIT_SYNC_DIR 1 +#else +/* + On linux default rwlock scheduling policy is good enough for + waiting_threads.c, on other systems use our special implementation + (which is slower). + + QQ perhaps this should be tested in configure ? how ? +*/ +#define WT_RWLOCKS_USE_MUTEXES 1 #endif #if !defined(__cplusplus) && !defined(bool) diff --git a/include/waiting_threads.h b/include/waiting_threads.h index 322b5972ec0..d0d5ffbd191 100644 --- a/include/waiting_threads.h +++ b/include/waiting_threads.h @@ -67,7 +67,6 @@ extern uint32 wt_success_stats; e.g. accessing a resource by thd->waiting_for is safe, a resource cannot be freed as there's a thread waiting for it */ - typedef struct st_wt_resource { WT_RESOURCE_ID id; uint waiter_count; @@ -76,11 +75,27 @@ typedef struct st_wt_resource { pthread_mutex_t *mutex; #endif /* - before the 'lock' all elements are mutable, after - immutable - in the sense that lf_hash_insert() won't memcpy() over them. + before the 'lock' all elements are mutable, after (and including) - + immutable in the sense that lf_hash_insert() won't memcpy() over them. See wt_init(). */ +#ifdef WT_RWLOCKS_USE_MUTEXES + /* + we need a special rwlock-like 'lock' to allow readers bypass + waiting writers, otherwise readers can deadlock. + writer starvation is technically possible, but unlikely, because + the contention is expected to be low. + */ + struct { + pthread_cond_t cond; + pthread_mutex_t mutex; + uint readers: 16; + uint pending_writers: 15; + uint write_locked: 1; + } lock; +#else rw_lock_t lock; +#endif pthread_cond_t cond; DYNAMIC_ARRAY owners; } WT_RESOURCE; diff --git a/mysys/thr_rwlock.c b/mysys/thr_rwlock.c index 2a249cbf850..280a0ec19e7 100644 --- a/mysys/thr_rwlock.c +++ b/mysys/thr_rwlock.c @@ -89,7 +89,7 @@ int my_rw_rdlock(rw_lock_t *rwp) pthread_mutex_lock(&rwp->lock); /* active or queued writers */ - while (( rwp->state < 0 )) + while ((rwp->state < 0 ) || rwp->waiters) pthread_cond_wait( &rwp->readers, &rwp->lock); rwp->state++; @@ -101,7 +101,7 @@ int my_rw_tryrdlock(rw_lock_t *rwp) { int res; pthread_mutex_lock(&rwp->lock); - if ((rwp->state < 0 )) + if ((rwp->state < 0 ) || rwp->waiters) res= EBUSY; /* Can't get lock */ else { diff --git a/mysys/waiting_threads.c b/mysys/waiting_threads.c index 489be6edbad..ef19018831b 100644 --- a/mysys/waiting_threads.c +++ b/mysys/waiting_threads.c @@ -133,56 +133,105 @@ uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats; static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock; -#define increment_success_stats() \ - do { \ - my_atomic_rwlock_wrlock(&success_stats_lock); \ - my_atomic_add32(&wt_success_stats, 1); \ - my_atomic_rwlock_wrunlock(&success_stats_lock); \ - } while (0) - -#define increment_cycle_stats(X,SLOT) \ - do { \ - uint i= (X); \ - if (i >= WT_CYCLE_STATS) \ - i= WT_CYCLE_STATS; \ - my_atomic_rwlock_wrlock(&cycle_stats_lock); \ - my_atomic_add32(&wt_cycle_stats[SLOT][i], 1); \ - my_atomic_rwlock_wrunlock(&cycle_stats_lock); \ - } while (0) - -#define increment_wait_stats(X,RET) \ - do { \ - uint i; \ - if ((RET) == ETIMEDOUT) \ - i= WT_WAIT_STATS; \ - else \ - { \ - ulonglong w=(X)/10; \ - for (i=0; i < WT_WAIT_STATS && w > wt_wait_table[i]; i++) ; \ - } \ - my_atomic_rwlock_wrlock(&wait_stats_lock); \ - my_atomic_add32(wt_wait_stats+i, 1); \ - my_atomic_rwlock_wrunlock(&wait_stats_lock); \ - } while (0) - -#define rc_rdlock(X) \ - do { \ - WT_RESOURCE *R=(X); \ - DBUG_PRINT("wt", ("LOCK resid=%lld for READ", R->id.value)); \ - rw_rdlock(&R->lock); \ - } while (0) -#define rc_wrlock(X) \ - do { \ - WT_RESOURCE *R=(X); \ - DBUG_PRINT("wt", ("LOCK resid=%lld for WRITE", R->id.value)); \ - rw_wrlock(&R->lock); \ - } while (0) -#define rc_unlock(X) \ - do { \ - WT_RESOURCE *R=(X); \ - DBUG_PRINT("wt", ("UNLOCK resid=%lld", R->id.value)); \ - rw_unlock(&R->lock); \ - } while (0) +static void increment_success_stats() +{ + my_atomic_rwlock_wrlock(&success_stats_lock); + my_atomic_add32(&wt_success_stats, 1); + my_atomic_rwlock_wrunlock(&success_stats_lock); +} + +static void increment_cycle_stats(uint depth, uint slot) +{ + if (depth >= WT_CYCLE_STATS) + depth= WT_CYCLE_STATS; + my_atomic_rwlock_wrlock(&cycle_stats_lock); + my_atomic_add32(&wt_cycle_stats[slot][depth], 1); + my_atomic_rwlock_wrunlock(&cycle_stats_lock); +} + +static void increment_wait_stats(ulonglong waited,int ret) +{ + uint i; + if ((ret) == ETIMEDOUT) + i= WT_WAIT_STATS; + else + for (i=0; i < WT_WAIT_STATS && waited/10 > wt_wait_table[i]; i++) ; + my_atomic_rwlock_wrlock(&wait_stats_lock); + my_atomic_add32(wt_wait_stats+i, 1); + my_atomic_rwlock_wrunlock(&wait_stats_lock); +} + +#ifdef WT_RWLOCKS_USE_MUTEXES +static void rc_rwlock_init(WT_RESOURCE *rc) +{ + pthread_cond_init(&rc->lock.cond, 0); + pthread_mutex_init(&rc->lock.mutex, MY_MUTEX_INIT_FAST); +} +static void rc_rwlock_destroy(WT_RESOURCE *rc) +{ + pthread_cond_destroy(&rc->lock.cond); + pthread_mutex_destroy(&rc->lock.mutex); +} +static void rc_rdlock(WT_RESOURCE *rc) +{ + DBUG_PRINT("wt", ("TRYLOCK resid=%ld for READ", (ulong)rc->id.value)); + pthread_mutex_lock(&rc->lock.mutex); + while (rc->lock.write_locked) + pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex); + rc->lock.readers++; + pthread_mutex_unlock(&rc->lock.mutex); + DBUG_PRINT("wt", ("LOCK resid=%ld for READ", (ulong)rc->id.value)); +} +static void rc_wrlock(WT_RESOURCE *rc) +{ + DBUG_PRINT("wt", ("TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value)); + pthread_mutex_lock(&rc->lock.mutex); + while (rc->lock.write_locked || rc->lock.readers) + pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex); + rc->lock.write_locked=1; + pthread_mutex_unlock(&rc->lock.mutex); + DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value)); +} +static void rc_unlock(WT_RESOURCE *rc) +{ + DBUG_PRINT("wt", ("UNLOCK resid=%ld", (ulong)rc->id.value)); + pthread_mutex_lock(&rc->lock.mutex); + if (rc->lock.write_locked) + { + rc->lock.write_locked=0; + pthread_cond_broadcast(&rc->lock.cond); + } + else if (--rc->lock.readers == 0) + pthread_cond_broadcast(&rc->lock.cond); + pthread_mutex_unlock(&rc->lock.mutex); +} +#else +static void rc_rwlock_init(WT_RESOURCE *rc) +{ + my_rwlock_init(&rc->lock, 0); +} +static void rc_rwlock_destroy(WT_RESOURCE *rc) +{ + rwlock_destroy(&rc->lock); +} +static void rc_rdlock(WT_RESOURCE *rc) +{ + DBUG_PRINT("wt", ("TRYLOCK resid=%ld for READ", (ulong)rc->id.value)); + rw_rdlock(&rc->lock); + DBUG_PRINT("wt", ("LOCK resid=%ld for READ", (ulong)rc->id.value)); +} +static void rc_wrlock(WT_RESOURCE *rc) +{ + DBUG_PRINT("wt", ("TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value)); + rw_wrlock(&rc->lock); + DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value)); +} +static void rc_unlock(WT_RESOURCE *rc) +{ + DBUG_PRINT("wt", ("UNLOCK resid=%ld", (ulong)rc->id.value)); + rw_unlock(&rc->lock); +} +#endif /* All resources are stored in a lock-free hash. Different threads @@ -202,7 +251,7 @@ static void wt_resource_init(uchar *arg) DBUG_ENTER("wt_resource_init"); bzero(rc, sizeof(*rc)); - my_rwlock_init(&rc->lock, 0); + rc_rwlock_init(rc); pthread_cond_init(&rc->cond, 0); my_init_dynamic_array(&rc->owners, sizeof(WT_THD *), 0, 5); DBUG_VOID_RETURN; @@ -220,7 +269,7 @@ static void wt_resource_destroy(uchar *arg) DBUG_ENTER("wt_resource_destroy"); DBUG_ASSERT(rc->owners.elements == 0); - rwlock_destroy(&rc->lock); + rc_rwlock_destroy(rc); pthread_cond_destroy(&rc->cond); delete_dynamic(&rc->owners); DBUG_VOID_RETURN; @@ -490,7 +539,7 @@ retry: } end: /* - Note that 'rc' is locked in this function, but it's never unlocked there. + Note that 'rc' is locked in this function, but it's never unlocked here. Instead it's saved in arg->rc and the *caller* is expected to unlock it. It's done to support different killing strategies. This is how it works: Assuming a graph @@ -549,6 +598,7 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth, struct deadlock_arg arg= {thd, max_depth, 0, 0}; int ret; DBUG_ENTER("deadlock"); + DBUG_ASSERT(depth < 2); ret= deadlock_search(&arg, blocker, depth); if (ret == WT_DEPTH_EXCEEDED) { @@ -688,8 +738,8 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid) LF_REQUIRE_PINS(3); - DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%llu", - thd->name, blocker->name, resid->value)); + DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%lu", + thd->name, blocker->name, (ulong)resid->value)); if (fix_thd_pins(thd)) DBUG_RETURN(WT_DEADLOCK); |