summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergei Golubchik <serg@mysql.com>2008-10-24 12:34:08 +0200
committerSergei Golubchik <serg@mysql.com>2008-10-24 12:34:08 +0200
commit14c146618707540c46e1ab1c8b8f103913e1237a (patch)
tree3d3e78ed586e4e58b2171a1e72ab81c51eec99d1
parent9fb894540ed937e1caf8109f356219c103a2c9d1 (diff)
downloadmariadb-git-14c146618707540c46e1ab1c8b8f103913e1237a.tar.gz
wt needs to use its own implementation of rwlocks with
reader preference, at least where system rwlocks are fair. include/my_global.h: wt uses mutex-based rwlock implementation unless on linux include/waiting_threads.h: mutex-based rwlock implementation with reader preference mysys/thr_rwlock.c: revert the change. make my_rw_locks fair mysys/waiting_threads.c: mutex-based rwlock implementation with reader preference. convert complex multi-line macros to static functions
-rw-r--r--include/my_global.h9
-rw-r--r--include/waiting_threads.h21
-rw-r--r--mysys/thr_rwlock.c4
-rw-r--r--mysys/waiting_threads.c160
4 files changed, 134 insertions, 60 deletions
diff --git a/include/my_global.h b/include/my_global.h
index f2d5a0e5013..d3f99c9147c 100644
--- a/include/my_global.h
+++ b/include/my_global.h
@@ -1524,6 +1524,15 @@ inline void operator delete[](void*, void*) { /* Do nothing */ }
*/
#ifdef TARGET_OS_LINUX
#define NEED_EXPLICIT_SYNC_DIR 1
+#else
+/*
+ On linux default rwlock scheduling policy is good enough for
+ waiting_threads.c, on other systems use our special implementation
+ (which is slower).
+
+ QQ perhaps this should be tested in configure ? how ?
+*/
+#define WT_RWLOCKS_USE_MUTEXES 1
#endif
#if !defined(__cplusplus) && !defined(bool)
diff --git a/include/waiting_threads.h b/include/waiting_threads.h
index 322b5972ec0..d0d5ffbd191 100644
--- a/include/waiting_threads.h
+++ b/include/waiting_threads.h
@@ -67,7 +67,6 @@ extern uint32 wt_success_stats;
e.g. accessing a resource by thd->waiting_for is safe,
a resource cannot be freed as there's a thread waiting for it
*/
-
typedef struct st_wt_resource {
WT_RESOURCE_ID id;
uint waiter_count;
@@ -76,11 +75,27 @@ typedef struct st_wt_resource {
pthread_mutex_t *mutex;
#endif
/*
- before the 'lock' all elements are mutable, after - immutable
- in the sense that lf_hash_insert() won't memcpy() over them.
+ before the 'lock' all elements are mutable, after (and including) -
+ immutable in the sense that lf_hash_insert() won't memcpy() over them.
See wt_init().
*/
+#ifdef WT_RWLOCKS_USE_MUTEXES
+ /*
+ we need a special rwlock-like 'lock' to allow readers bypass
+ waiting writers, otherwise readers can deadlock.
+ writer starvation is technically possible, but unlikely, because
+ the contention is expected to be low.
+ */
+ struct {
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ uint readers: 16;
+ uint pending_writers: 15;
+ uint write_locked: 1;
+ } lock;
+#else
rw_lock_t lock;
+#endif
pthread_cond_t cond;
DYNAMIC_ARRAY owners;
} WT_RESOURCE;
diff --git a/mysys/thr_rwlock.c b/mysys/thr_rwlock.c
index 2a249cbf850..280a0ec19e7 100644
--- a/mysys/thr_rwlock.c
+++ b/mysys/thr_rwlock.c
@@ -89,7 +89,7 @@ int my_rw_rdlock(rw_lock_t *rwp)
pthread_mutex_lock(&rwp->lock);
/* active or queued writers */
- while (( rwp->state < 0 ))
+ while ((rwp->state < 0 ) || rwp->waiters)
pthread_cond_wait( &rwp->readers, &rwp->lock);
rwp->state++;
@@ -101,7 +101,7 @@ int my_rw_tryrdlock(rw_lock_t *rwp)
{
int res;
pthread_mutex_lock(&rwp->lock);
- if ((rwp->state < 0 ))
+ if ((rwp->state < 0 ) || rwp->waiters)
res= EBUSY; /* Can't get lock */
else
{
diff --git a/mysys/waiting_threads.c b/mysys/waiting_threads.c
index 489be6edbad..ef19018831b 100644
--- a/mysys/waiting_threads.c
+++ b/mysys/waiting_threads.c
@@ -133,56 +133,105 @@ uint32 wt_cycle_stats[2][WT_CYCLE_STATS+1], wt_success_stats;
static my_atomic_rwlock_t cycle_stats_lock, wait_stats_lock, success_stats_lock;
-#define increment_success_stats() \
- do { \
- my_atomic_rwlock_wrlock(&success_stats_lock); \
- my_atomic_add32(&wt_success_stats, 1); \
- my_atomic_rwlock_wrunlock(&success_stats_lock); \
- } while (0)
-
-#define increment_cycle_stats(X,SLOT) \
- do { \
- uint i= (X); \
- if (i >= WT_CYCLE_STATS) \
- i= WT_CYCLE_STATS; \
- my_atomic_rwlock_wrlock(&cycle_stats_lock); \
- my_atomic_add32(&wt_cycle_stats[SLOT][i], 1); \
- my_atomic_rwlock_wrunlock(&cycle_stats_lock); \
- } while (0)
-
-#define increment_wait_stats(X,RET) \
- do { \
- uint i; \
- if ((RET) == ETIMEDOUT) \
- i= WT_WAIT_STATS; \
- else \
- { \
- ulonglong w=(X)/10; \
- for (i=0; i < WT_WAIT_STATS && w > wt_wait_table[i]; i++) ; \
- } \
- my_atomic_rwlock_wrlock(&wait_stats_lock); \
- my_atomic_add32(wt_wait_stats+i, 1); \
- my_atomic_rwlock_wrunlock(&wait_stats_lock); \
- } while (0)
-
-#define rc_rdlock(X) \
- do { \
- WT_RESOURCE *R=(X); \
- DBUG_PRINT("wt", ("LOCK resid=%lld for READ", R->id.value)); \
- rw_rdlock(&R->lock); \
- } while (0)
-#define rc_wrlock(X) \
- do { \
- WT_RESOURCE *R=(X); \
- DBUG_PRINT("wt", ("LOCK resid=%lld for WRITE", R->id.value)); \
- rw_wrlock(&R->lock); \
- } while (0)
-#define rc_unlock(X) \
- do { \
- WT_RESOURCE *R=(X); \
- DBUG_PRINT("wt", ("UNLOCK resid=%lld", R->id.value)); \
- rw_unlock(&R->lock); \
- } while (0)
+static void increment_success_stats()
+{
+ my_atomic_rwlock_wrlock(&success_stats_lock);
+ my_atomic_add32(&wt_success_stats, 1);
+ my_atomic_rwlock_wrunlock(&success_stats_lock);
+}
+
+static void increment_cycle_stats(uint depth, uint slot)
+{
+ if (depth >= WT_CYCLE_STATS)
+ depth= WT_CYCLE_STATS;
+ my_atomic_rwlock_wrlock(&cycle_stats_lock);
+ my_atomic_add32(&wt_cycle_stats[slot][depth], 1);
+ my_atomic_rwlock_wrunlock(&cycle_stats_lock);
+}
+
+static void increment_wait_stats(ulonglong waited,int ret)
+{
+ uint i;
+ if ((ret) == ETIMEDOUT)
+ i= WT_WAIT_STATS;
+ else
+ for (i=0; i < WT_WAIT_STATS && waited/10 > wt_wait_table[i]; i++) ;
+ my_atomic_rwlock_wrlock(&wait_stats_lock);
+ my_atomic_add32(wt_wait_stats+i, 1);
+ my_atomic_rwlock_wrunlock(&wait_stats_lock);
+}
+
+#ifdef WT_RWLOCKS_USE_MUTEXES
+static void rc_rwlock_init(WT_RESOURCE *rc)
+{
+ pthread_cond_init(&rc->lock.cond, 0);
+ pthread_mutex_init(&rc->lock.mutex, MY_MUTEX_INIT_FAST);
+}
+static void rc_rwlock_destroy(WT_RESOURCE *rc)
+{
+ pthread_cond_destroy(&rc->lock.cond);
+ pthread_mutex_destroy(&rc->lock.mutex);
+}
+static void rc_rdlock(WT_RESOURCE *rc)
+{
+ DBUG_PRINT("wt", ("TRYLOCK resid=%ld for READ", (ulong)rc->id.value));
+ pthread_mutex_lock(&rc->lock.mutex);
+ while (rc->lock.write_locked)
+ pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex);
+ rc->lock.readers++;
+ pthread_mutex_unlock(&rc->lock.mutex);
+ DBUG_PRINT("wt", ("LOCK resid=%ld for READ", (ulong)rc->id.value));
+}
+static void rc_wrlock(WT_RESOURCE *rc)
+{
+ DBUG_PRINT("wt", ("TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value));
+ pthread_mutex_lock(&rc->lock.mutex);
+ while (rc->lock.write_locked || rc->lock.readers)
+ pthread_cond_wait(&rc->lock.cond, &rc->lock.mutex);
+ rc->lock.write_locked=1;
+ pthread_mutex_unlock(&rc->lock.mutex);
+ DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value));
+}
+static void rc_unlock(WT_RESOURCE *rc)
+{
+ DBUG_PRINT("wt", ("UNLOCK resid=%ld", (ulong)rc->id.value));
+ pthread_mutex_lock(&rc->lock.mutex);
+ if (rc->lock.write_locked)
+ {
+ rc->lock.write_locked=0;
+ pthread_cond_broadcast(&rc->lock.cond);
+ }
+ else if (--rc->lock.readers == 0)
+ pthread_cond_broadcast(&rc->lock.cond);
+ pthread_mutex_unlock(&rc->lock.mutex);
+}
+#else
+static void rc_rwlock_init(WT_RESOURCE *rc)
+{
+ my_rwlock_init(&rc->lock, 0);
+}
+static void rc_rwlock_destroy(WT_RESOURCE *rc)
+{
+ rwlock_destroy(&rc->lock);
+}
+static void rc_rdlock(WT_RESOURCE *rc)
+{
+ DBUG_PRINT("wt", ("TRYLOCK resid=%ld for READ", (ulong)rc->id.value));
+ rw_rdlock(&rc->lock);
+ DBUG_PRINT("wt", ("LOCK resid=%ld for READ", (ulong)rc->id.value));
+}
+static void rc_wrlock(WT_RESOURCE *rc)
+{
+ DBUG_PRINT("wt", ("TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value));
+ rw_wrlock(&rc->lock);
+ DBUG_PRINT("wt", ("LOCK resid=%ld for WRITE", (ulong)rc->id.value));
+}
+static void rc_unlock(WT_RESOURCE *rc)
+{
+ DBUG_PRINT("wt", ("UNLOCK resid=%ld", (ulong)rc->id.value));
+ rw_unlock(&rc->lock);
+}
+#endif
/*
All resources are stored in a lock-free hash. Different threads
@@ -202,7 +251,7 @@ static void wt_resource_init(uchar *arg)
DBUG_ENTER("wt_resource_init");
bzero(rc, sizeof(*rc));
- my_rwlock_init(&rc->lock, 0);
+ rc_rwlock_init(rc);
pthread_cond_init(&rc->cond, 0);
my_init_dynamic_array(&rc->owners, sizeof(WT_THD *), 0, 5);
DBUG_VOID_RETURN;
@@ -220,7 +269,7 @@ static void wt_resource_destroy(uchar *arg)
DBUG_ENTER("wt_resource_destroy");
DBUG_ASSERT(rc->owners.elements == 0);
- rwlock_destroy(&rc->lock);
+ rc_rwlock_destroy(rc);
pthread_cond_destroy(&rc->cond);
delete_dynamic(&rc->owners);
DBUG_VOID_RETURN;
@@ -490,7 +539,7 @@ retry:
}
end:
/*
- Note that 'rc' is locked in this function, but it's never unlocked there.
+ Note that 'rc' is locked in this function, but it's never unlocked here.
Instead it's saved in arg->rc and the *caller* is expected to unlock it.
It's done to support different killing strategies. This is how it works:
Assuming a graph
@@ -549,6 +598,7 @@ static int deadlock(WT_THD *thd, WT_THD *blocker, uint depth,
struct deadlock_arg arg= {thd, max_depth, 0, 0};
int ret;
DBUG_ENTER("deadlock");
+ DBUG_ASSERT(depth < 2);
ret= deadlock_search(&arg, blocker, depth);
if (ret == WT_DEPTH_EXCEEDED)
{
@@ -688,8 +738,8 @@ int wt_thd_will_wait_for(WT_THD *thd, WT_THD *blocker, WT_RESOURCE_ID *resid)
LF_REQUIRE_PINS(3);
- DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%llu",
- thd->name, blocker->name, resid->value));
+ DBUG_PRINT("wt", ("enter: thd=%s, blocker=%s, resid=%lu",
+ thd->name, blocker->name, (ulong)resid->value));
if (fix_thd_pins(thd))
DBUG_RETURN(WT_DEADLOCK);