summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/innobase/btr/btr0cur.c2
-rw-r--r--storage/innobase/btr/btr0sea.c4
-rw-r--r--storage/innobase/buf/buf0buf.c12
-rw-r--r--storage/innobase/handler/ha_innodb.cc31
-rw-r--r--storage/innobase/include/buf0buf.ic2
-rw-r--r--storage/innobase/include/os0sync.h23
-rw-r--r--storage/innobase/include/os0sync.ic35
-rw-r--r--storage/innobase/include/srv0srv.h3
-rw-r--r--storage/innobase/include/sync0rw.h105
-rw-r--r--storage/innobase/include/sync0rw.ic368
-rw-r--r--storage/innobase/include/sync0sync.h15
-rw-r--r--storage/innobase/include/sync0sync.ic49
-rw-r--r--storage/innobase/include/univ.i3
-rw-r--r--storage/innobase/mem/mem0pool.c8
-rw-r--r--storage/innobase/row/row0sel.c4
-rw-r--r--storage/innobase/srv/srv0srv.c10
-rw-r--r--storage/innobase/srv/srv0start.c10
-rw-r--r--storage/innobase/sync/sync0arr.c114
-rw-r--r--storage/innobase/sync/sync0rw.c471
-rw-r--r--storage/innobase/sync/sync0sync.c61
20 files changed, 762 insertions, 568 deletions
diff --git a/storage/innobase/btr/btr0cur.c b/storage/innobase/btr/btr0cur.c
index 54acdf73db6..a2f62255dd6 100644
--- a/storage/innobase/btr/btr0cur.c
+++ b/storage/innobase/btr/btr0cur.c
@@ -333,7 +333,7 @@ btr_cur_search_to_nth_level(
#ifdef UNIV_SEARCH_PERF_STAT
info->n_searches++;
#endif
- if (btr_search_latch.writer == RW_LOCK_NOT_LOCKED
+ if (rw_lock_get_writer(&btr_search_latch) == RW_LOCK_NOT_LOCKED
&& latch_mode <= BTR_MODIFY_LEAF && info->last_hash_succ
&& !estimate
#ifdef PAGE_CUR_LE_OR_EXTENDS
diff --git a/storage/innobase/btr/btr0sea.c b/storage/innobase/btr/btr0sea.c
index 2fe3606a390..b0ce5db6ccd 100644
--- a/storage/innobase/btr/btr0sea.c
+++ b/storage/innobase/btr/btr0sea.c
@@ -748,8 +748,8 @@ btr_search_guess_on_hash(
rw_lock_s_lock(&btr_search_latch);
}
- ut_ad(btr_search_latch.writer != RW_LOCK_EX);
- ut_ad(btr_search_latch.reader_count > 0);
+ ut_ad(rw_lock_get_writer(&btr_search_latch) != RW_LOCK_EX);
+ ut_ad(rw_lock_get_reader_count(&btr_search_latch) > 0);
rec = ha_search_and_get_data(btr_search_sys->hash_index, fold);
diff --git a/storage/innobase/buf/buf0buf.c b/storage/innobase/buf/buf0buf.c
index 901ce8e0fef..5b4f0ee6ecb 100644
--- a/storage/innobase/buf/buf0buf.c
+++ b/storage/innobase/buf/buf0buf.c
@@ -1277,8 +1277,8 @@ loop:
if (mode == BUF_GET_NOWAIT) {
if (rw_latch == RW_S_LATCH) {
- success = rw_lock_s_lock_func_nowait(&(block->lock),
- file, line);
+ success = rw_lock_s_lock_nowait(&(block->lock),
+ file, line);
fix_type = MTR_MEMO_PAGE_S_FIX;
} else {
ut_ad(rw_latch == RW_X_LATCH);
@@ -1403,8 +1403,8 @@ buf_page_optimistic_get_func(
ut_ad(!ibuf_inside() || ibuf_page(block->space, block->offset));
if (rw_latch == RW_S_LATCH) {
- success = rw_lock_s_lock_func_nowait(&(block->lock),
- file, line);
+ success = rw_lock_s_lock_nowait(&(block->lock),
+ file, line);
fix_type = MTR_MEMO_PAGE_S_FIX;
} else {
success = rw_lock_x_lock_func_nowait(&(block->lock),
@@ -1534,8 +1534,8 @@ buf_page_get_known_nowait(
ut_ad(!ibuf_inside() || (mode == BUF_KEEP_OLD));
if (rw_latch == RW_S_LATCH) {
- success = rw_lock_s_lock_func_nowait(&(block->lock),
- file, line);
+ success = rw_lock_s_lock_nowait(&(block->lock),
+ file, line);
fix_type = MTR_MEMO_PAGE_S_FIX;
} else {
success = rw_lock_x_lock_func_nowait(&(block->lock),
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 887acacbd1f..36a42fb3eb3 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -374,6 +374,10 @@ static SHOW_VAR innodb_status_variables[]= {
(char*) &export_vars.innodb_dblwr_pages_written, SHOW_LONG},
{"dblwr_writes",
(char*) &export_vars.innodb_dblwr_writes, SHOW_LONG},
+ {"have_atomic_builtins",
+ (char*) &export_vars.innodb_have_atomic_builtins, SHOW_BOOL},
+ {"heap_enabled",
+ (char*) &export_vars.innodb_heap_enabled, SHOW_BOOL},
{"log_waits",
(char*) &export_vars.innodb_log_waits, SHOW_LONG},
{"log_write_requests",
@@ -6878,6 +6882,7 @@ innodb_mutex_show_status(
{
char buf1[IO_SIZE], buf2[IO_SIZE];
mutex_t* mutex;
+ rw_lock_t* lock;
#ifdef UNIV_DEBUG
ulint rw_lock_count= 0;
ulint rw_lock_count_spin_loop= 0;
@@ -6948,6 +6953,31 @@ innodb_mutex_show_status(
mutex_exit_noninline(&mutex_list_mutex);
+ mutex_enter_noninline(&rw_lock_list_mutex);
+
+ lock = UT_LIST_GET_FIRST(rw_lock_list);
+
+ while (lock != NULL)
+ {
+ if (lock->count_os_wait)
+ {
+ buf1len= my_snprintf(buf1, sizeof(buf1), "%s:%lu",
+ lock->cfile_name, (ulong) lock->cline);
+ buf2len= my_snprintf(buf2, sizeof(buf2),
+ "os_waits=%lu", lock->count_os_wait);
+
+ if (stat_print(thd, innobase_hton_name,
+ hton_name_len, buf1, buf1len,
+ buf2, buf2len)) {
+ mutex_exit_noninline(&rw_lock_list_mutex);
+ DBUG_RETURN(1);
+ }
+ }
+ lock = UT_LIST_GET_NEXT(list, lock);
+ }
+
+ mutex_exit_noninline(&rw_lock_list_mutex);
+
#ifdef UNIV_DEBUG
buf2len= my_snprintf(buf2, sizeof(buf2),
"count=%lu, spin_waits=%lu, spin_rounds=%lu, "
@@ -6980,6 +7010,7 @@ bool innobase_show_status(handlerton *hton, THD* thd,
return FALSE;
}
}
+ rw_lock_t* lock;
/****************************************************************************
diff --git a/storage/innobase/include/buf0buf.ic b/storage/innobase/include/buf0buf.ic
index b077ff0c181..4e96e13b8dc 100644
--- a/storage/innobase/include/buf0buf.ic
+++ b/storage/innobase/include/buf0buf.ic
@@ -513,7 +513,7 @@ buf_block_buf_fix_inc_debug(
{
ibool ret;
- ret = rw_lock_s_lock_func_nowait(&(block->debug_latch), file, line);
+ ret = rw_lock_s_lock_nowait(&(block->debug_latch), file, line);
ut_ad(ret == TRUE);
ut_ad(mutex_own(&block->mutex));
diff --git a/storage/innobase/include/os0sync.h b/storage/innobase/include/os0sync.h
index a39a331c297..5533df4f608 100644
--- a/storage/innobase/include/os0sync.h
+++ b/storage/innobase/include/os0sync.h
@@ -261,6 +261,29 @@ os_fast_mutex_free(
/*===============*/
os_fast_mutex_t* fast_mutex); /* in: mutex to free */
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+/**************************************************************
+Atomic compare-and-swap for InnoDB. Currently requires GCC atomic builtins. */
+UNIV_INLINE
+ibool
+os_compare_and_swap(
+/*================*/
+ /* out: true if swapped */
+ volatile lint* ptr, /* in: pointer to target */
+ lint oldVal, /* in: value to compare to */
+ lint newVal); /* in: value to swap in */
+/**************************************************************
+Atomic increment for InnoDB. Currently requires GCC atomic builtins. */
+UNIV_INLINE
+lint
+os_atomic_increment(
+/*================*/
+ /* out: resulting value */
+ volatile lint* ptr, /* in: pointer to target */
+ lint amount); /* in: amount of increment */
+
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
+
#ifndef UNIV_NONINL
#include "os0sync.ic"
#endif
diff --git a/storage/innobase/include/os0sync.ic b/storage/innobase/include/os0sync.ic
index 75dea9369c2..2a962529d95 100644
--- a/storage/innobase/include/os0sync.ic
+++ b/storage/innobase/include/os0sync.ic
@@ -44,3 +44,38 @@ os_fast_mutex_trylock(
#endif
#endif
}
+
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+/**************************************************************
+Atomic compare-and-swap for InnoDB. Currently requires GCC atomic builtins. */
+UNIV_INLINE
+ibool
+os_compare_and_swap(
+/*================*/
+ /* out: true if swapped */
+ volatile lint* ptr, /* in: pointer to target */
+ lint oldVal, /* in: value to compare to */
+ lint newVal) /* in: value to swap in */
+{
+ if(__sync_bool_compare_and_swap(ptr, oldVal, newVal)) {
+ return(TRUE);
+ }
+
+ return(FALSE);
+}
+
+/**************************************************************
+Atomic increment for InnoDB. Currently requires GCC atomic builtins. */
+UNIV_INLINE
+lint
+os_atomic_increment(
+/*================*/
+ /* out: resulting value */
+ volatile lint* ptr, /* in: pointer to target */
+ lint amount) /* in: amount of increment */
+{
+ lint newVal = __sync_add_and_fetch(ptr, amount);
+ return newVal;
+}
+
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h
index 91daa6816b2..4b041555a92 100644
--- a/storage/innobase/include/srv0srv.h
+++ b/storage/innobase/include/srv0srv.h
@@ -513,6 +513,8 @@ struct export_var_struct{
ulint innodb_buffer_pool_read_ahead_rnd;
ulint innodb_dblwr_pages_written;
ulint innodb_dblwr_writes;
+ ibool innodb_have_atomic_builtins;
+ ibool innodb_heap_enabled;
ulint innodb_log_waits;
ulint innodb_log_write_requests;
ulint innodb_log_writes;
@@ -549,4 +551,3 @@ struct srv_sys_struct{
extern ulint srv_n_threads_active[];
#endif
-
diff --git a/storage/innobase/include/sync0rw.h b/storage/innobase/include/sync0rw.h
index 008df80a2c7..a4145cdd666 100644
--- a/storage/innobase/include/sync0rw.h
+++ b/storage/innobase/include/sync0rw.h
@@ -24,6 +24,12 @@ smaller than 30 and the order of the numerical values like below! */
#define RW_X_LATCH 2
#define RW_NO_LATCH 3
+/* We decrement lock_word by this amount for each x_lock. It is also the
+start value for the lock_word, meaning that it limits the maximum number
+of concurrent read locks before the rw_lock breaks. The current value of
+0x00100000 allows 1,048,575 concurrent readers and 2047 recursive writers.*/
+#define X_LOCK_DECR 0x00100000
+
typedef struct rw_lock_struct rw_lock_t;
#ifdef UNIV_SYNC_DEBUG
typedef struct rw_lock_debug_struct rw_lock_debug_t;
@@ -47,14 +53,14 @@ extern ibool rw_lock_debug_waiters; /* This is set to TRUE, if
there may be waiters for the event */
#endif /* UNIV_SYNC_DEBUG */
-extern ulint rw_s_system_call_count;
-extern ulint rw_s_spin_wait_count;
-extern ulint rw_s_exit_count;
-extern ulint rw_s_os_wait_count;
-extern ulint rw_x_system_call_count;
-extern ulint rw_x_spin_wait_count;
-extern ulint rw_x_os_wait_count;
-extern ulint rw_x_exit_count;
+extern ib_longlong rw_s_spin_wait_count;
+extern ib_longlong rw_s_spin_round_count;
+extern ib_longlong rw_s_exit_count;
+extern ib_longlong rw_s_os_wait_count;
+extern ib_longlong rw_x_spin_wait_count;
+extern ib_longlong rw_x_spin_round_count;
+extern ib_longlong rw_x_os_wait_count;
+extern ib_longlong rw_x_exit_count;
/**********************************************************************
Creates, or rather, initializes an rw-lock object in a specified memory
@@ -127,8 +133,8 @@ corresponding function. */
NOTE! The following macros should be used in rw s-locking, not the
corresponding function. */
-#define rw_lock_s_lock_nowait(M) rw_lock_s_lock_func_nowait(\
- (M), __FILE__, __LINE__)
+#define rw_lock_s_lock_nowait(M, F, L) rw_lock_s_lock_low(\
+ (M), 0, (F), (L))
/**********************************************************************
NOTE! Use the corresponding macro, not directly this function, except if
you supply the file name and line number. Lock an rw-lock in shared mode
@@ -146,18 +152,6 @@ rw_lock_s_lock_func(
const char* file_name,/* in: file name where lock requested */
ulint line); /* in: line where requested */
/**********************************************************************
-NOTE! Use the corresponding macro, not directly this function, except if
-you supply the file name and line number. Lock an rw-lock in shared mode
-for the current thread if the lock can be acquired immediately. */
-UNIV_INLINE
-ibool
-rw_lock_s_lock_func_nowait(
-/*=======================*/
- /* out: TRUE if success */
- rw_lock_t* lock, /* in: pointer to rw-lock */
- const char* file_name,/* in: file name where lock requested */
- ulint line); /* in: line where requested */
-/**********************************************************************
NOTE! Use the corresponding macro, not directly this function! Lock an
rw-lock in exclusive mode for the current thread if the lock can be
obtained immediately. */
@@ -341,6 +335,23 @@ ulint
rw_lock_get_reader_count(
/*=====================*/
rw_lock_t* lock);
+/**********************************************************************
+Decrements lock_word the specified amount if it is greater than 0.
+This is used by both s_lock and x_lock operations. */
+UNIV_INLINE
+ibool
+rw_lock_lock_word_decr(
+ /* out: TRUE if decr occurs */
+ rw_lock_t* lock, /* in: rw-lock */
+ ulint amount); /* in: amount to decrement */
+/**********************************************************************
+Increments lock_word the specified amount and returns new value. */
+UNIV_INLINE
+lint
+rw_lock_lock_word_incr(
+ /* out: TRUE if decr occurs */
+ rw_lock_t* lock,
+ ulint amount); /* in: rw-lock */
#ifdef UNIV_SYNC_DEBUG
/**********************************************************************
Checks if the thread has locked the rw-lock in the specified mode, with
@@ -417,44 +428,28 @@ Do not use its fields directly! The structure used in the spin lock
implementation of a read-write lock. Several threads may have a shared lock
simultaneously in this lock, but only one writer may have an exclusive lock,
in which case no shared locks are allowed. To prevent starving of a writer
-blocked by readers, a writer may queue for the lock by setting the writer
-field. Then no new readers are allowed in. */
+blocked by readers, a writer may queue for x-lock by decrementing lock_word:
+no new readers will be let in while the thread waits for readers to exit. */
struct rw_lock_struct {
- os_event_t event; /* Used by sync0arr.c for thread queueing */
-
-#ifdef __WIN__
- os_event_t wait_ex_event; /* This windows specific event is
- used by the thread which has set the
- lock state to RW_LOCK_WAIT_EX. The
- rw_lock design guarantees that this
- thread will be the next one to proceed
- once the current the event gets
- signalled. See LEMMA 2 in sync0sync.c */
-#endif
-
- ulint reader_count; /* Number of readers who have locked this
- lock in the shared mode */
- ulint writer; /* This field is set to RW_LOCK_EX if there
- is a writer owning the lock (in exclusive
- mode), RW_LOCK_WAIT_EX if a writer is
- queueing for the lock, and
- RW_LOCK_NOT_LOCKED, otherwise. */
- os_thread_id_t writer_thread;
- /* Thread id of a possible writer thread */
- ulint writer_count; /* Number of times the same thread has
- recursively locked the lock in the exclusive
- mode */
- mutex_t mutex; /* The mutex protecting rw_lock_struct */
- ulint pass; /* Default value 0. This is set to some
+ volatile lint lock_word;
+ /* Holds the state of the lock. */
+ volatile ulint waiters;/* 1: there are waiters */
+ volatile ulint pass; /* Default value 0. This is set to some
value != 0 given by the caller of an x-lock
operation, if the x-lock is to be passed to
another thread to unlock (which happens in
asynchronous i/o). */
- ulint waiters; /* This ulint is set to 1 if there are
- waiters (readers or writers) in the global
- wait array, waiting for this rw_lock.
- Otherwise, == 0. */
+ volatile os_thread_id_t writer_thread;
+ /* Thread id of writer thread */
+ os_event_t event; /* Used by sync0arr.c for thread queueing */
+ os_event_t wait_ex_event;
+ /* Event for next-writer to wait on. A thread
+ must decrement lock_word before waiting. */
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
+ mutex_t mutex; /* The mutex protecting rw_lock_struct */
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
+
UT_LIST_NODE_T(rw_lock_t) list;
/* All allocated rw locks are put into a
list */
@@ -464,7 +459,9 @@ struct rw_lock_struct {
info list of the lock */
ulint level; /* Level in the global latching order. */
#endif /* UNIV_SYNC_DEBUG */
+ ulint count_os_wait; /* Count of os_waits. May not be accurate */
const char* cfile_name;/* File name where lock created */
+ /* last s-lock file/line is not guaranteed to be correct */
const char* last_s_file_name;/* File name where last s-locked */
const char* last_x_file_name;/* File name where last x-locked */
ibool writer_is_wait_ex;
diff --git a/storage/innobase/include/sync0rw.ic b/storage/innobase/include/sync0rw.ic
index eea639f26f4..75b696e3915 100644
--- a/storage/innobase/include/sync0rw.ic
+++ b/storage/innobase/include/sync0rw.ic
@@ -62,40 +62,48 @@ rw_lock_set_waiters(
{
lock->waiters = flag;
}
+
+/**********************************************************************
+Returns the write-status of the lock - this function made more sense
+with the old rw_lock implementation.
+ */
UNIV_INLINE
ulint
rw_lock_get_writer(
/*===============*/
rw_lock_t* lock)
{
- return(lock->writer);
-}
-UNIV_INLINE
-void
-rw_lock_set_writer(
-/*===============*/
- rw_lock_t* lock,
- ulint flag)
-{
- lock->writer = flag;
+ lint lock_word = lock->lock_word;
+ if(lock_word > 0) {
+ /* return NOT_LOCKED in s-lock state, like the writer
+ member of the old lock implementation. */
+ return RW_LOCK_NOT_LOCKED;
+ } else if (((-lock_word) % X_LOCK_DECR) == 0) {
+ return RW_LOCK_EX;
+ } else {
+ ut_ad(lock_word > -X_LOCK_DECR);
+ return RW_LOCK_WAIT_EX;
+ }
}
+
UNIV_INLINE
ulint
rw_lock_get_reader_count(
/*=====================*/
rw_lock_t* lock)
{
- return(lock->reader_count);
-}
-UNIV_INLINE
-void
-rw_lock_set_reader_count(
-/*=====================*/
- rw_lock_t* lock,
- ulint count)
-{
- lock->reader_count = count;
+ lint lock_word = lock->lock_word;
+ if(lock_word > 0) {
+ /* s-locked, no x-waiters */
+ return(X_LOCK_DECR - lock_word);
+ } else if (lock_word < 0 && lock_word > -X_LOCK_DECR) {
+ /* s-locked, with x-waiters */
+ return (ulint)(-lock_word);
+ }
+ return 0;
}
+
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
UNIV_INLINE
mutex_t*
rw_lock_get_mutex(
@@ -104,6 +112,7 @@ rw_lock_get_mutex(
{
return(&(lock->mutex));
}
+#endif
/**********************************************************************
Returns the value of writer_count for the lock. Does not reserve the lock
@@ -115,7 +124,87 @@ rw_lock_get_x_lock_count(
/* out: value of writer_count */
rw_lock_t* lock) /* in: rw-lock */
{
- return(lock->writer_count);
+ lint lock_copy = lock->lock_word;
+ /* If there is a reader, lock_word is not divisible by X_LOCK_DECR */
+ if(lock_copy > 0 || (-lock_copy) % X_LOCK_DECR != 0) {
+ return 0;
+ }
+ return ((-lock_copy) / X_LOCK_DECR) + 1;
+}
+
+/**********************************************************************
+Two different implementations for decrementing the lock_word of a rw_lock:
+one for systems supporting atomic operations, one for others. This does
+does not support recusive x-locks: they should be handled by the caller and
+need not be atomic since they are performed by the current lock holder.
+Returns true if the decrement was made, false if not. */
+UNIV_INLINE
+ibool
+rw_lock_lock_word_decr(
+ /* out: TRUE if decr occurs */
+ rw_lock_t* lock, /* in: rw-lock */
+ ulint amount) /* in: amount of decrement */
+{
+
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+
+ lint local_lock_word = lock->lock_word;
+ while (local_lock_word > 0) {
+ if(os_compare_and_swap(&(lock->lock_word),
+ local_lock_word,
+ local_lock_word - amount)) {
+ return TRUE;
+ }
+ local_lock_word = lock->lock_word;
+ }
+ return(FALSE);
+
+#else /* HAVE_GCC_ATOMIC_BUILTINS */
+
+ ibool success = FALSE;
+ mutex_enter(&(lock->mutex));
+ if(lock->lock_word > 0) {
+ lock->lock_word -= amount;
+ success = TRUE;
+ }
+ mutex_exit(&(lock->mutex));
+ return success;
+
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
+
+}
+
+/**********************************************************************
+Two different implementations for incrementing the lock_word of a rw_lock:
+one for systems supporting atomic operations, one for others.
+Returns the value of lock_word after increment. */
+UNIV_INLINE
+lint
+rw_lock_lock_word_incr(
+ /* out: lock->lock_word after increment */
+ rw_lock_t* lock, /* in: rw-lock */
+ ulint amount) /* in: amount of increment */
+{
+
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+
+ return(os_atomic_increment(&(lock->lock_word), amount));
+
+#else /* HAVE_GCC_ATOMIC_BUILTINS */
+
+ lint local_lock_word;
+
+ mutex_enter(&(lock->mutex));
+
+ lock->lock_word += amount;
+ local_lock_word = lock->lock_word;
+
+ mutex_exit(&(lock->mutex));
+
+ return local_lock_word;
+
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
+
}
/**********************************************************************
@@ -133,25 +222,21 @@ rw_lock_s_lock_low(
const char* file_name, /* in: file name where lock requested */
ulint line) /* in: line where requested */
{
- ut_ad(mutex_own(rw_lock_get_mutex(lock)));
-
- /* Check if the writer field is free */
-
- if (UNIV_LIKELY(lock->writer == RW_LOCK_NOT_LOCKED)) {
- /* Set the shared lock by incrementing the reader count */
- lock->reader_count++;
+ /* TODO: study performance of UNIV_LIKELY branch prediction hints. */
+ if (!rw_lock_lock_word_decr(lock, 1)) {
+ /* Locking did not succeed */
+ return(FALSE);
+ }
#ifdef UNIV_SYNC_DEBUG
- rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name,
- line);
+ rw_lock_add_debug_info(lock, pass, RW_LOCK_SHARED, file_name, line);
#endif
- lock->last_s_file_name = file_name;
- lock->last_s_line = line;
-
- return(TRUE); /* locking succeeded */
- }
+ /* These debugging values are not set safely: they may be incorrect
+ or even refer to a line that is invalid for the file name. */
+ lock->last_s_file_name = file_name;
+ lock->last_s_line = line;
- return(FALSE); /* locking did not succeed */
+ return(TRUE); /* locking succeeded */
}
/**********************************************************************
@@ -166,11 +251,10 @@ rw_lock_s_lock_direct(
const char* file_name, /* in: file name where requested */
ulint line) /* in: line where lock requested */
{
- ut_ad(lock->writer == RW_LOCK_NOT_LOCKED);
- ut_ad(rw_lock_get_reader_count(lock) == 0);
+ ut_ad(lock->lock_word == X_LOCK_DECR);
- /* Set the shared lock by incrementing the reader count */
- lock->reader_count++;
+ /* Indicate there is a new reader by decrementing lock_word */
+ lock->lock_word--;
lock->last_s_file_name = file_name;
lock->last_s_line = line;
@@ -193,12 +277,10 @@ rw_lock_x_lock_direct(
ulint line) /* in: line where lock requested */
{
ut_ad(rw_lock_validate(lock));
- ut_ad(rw_lock_get_reader_count(lock) == 0);
- ut_ad(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED);
+ ut_ad(lock->lock_word == X_LOCK_DECR);
- rw_lock_set_writer(lock, RW_LOCK_EX);
+ lock->lock_word -= X_LOCK_DECR;
lock->writer_thread = os_thread_get_curr_id();
- lock->writer_count++;
lock->pass = 0;
lock->last_x_file_name = file_name;
@@ -240,15 +322,12 @@ rw_lock_s_lock_func(
ut_ad(!rw_lock_own(lock, RW_LOCK_SHARED)); /* see NOTE above */
#endif /* UNIV_SYNC_DEBUG */
- mutex_enter(rw_lock_get_mutex(lock));
-
- if (UNIV_LIKELY(rw_lock_s_lock_low(lock, pass, file_name, line))) {
- mutex_exit(rw_lock_get_mutex(lock));
+ /* TODO: study performance of UNIV_LIKELY branch prediction hints. */
+ if (rw_lock_s_lock_low(lock, pass, file_name, line)) {
return; /* Success */
} else {
/* Did not succeed, try spin wait */
- mutex_exit(rw_lock_get_mutex(lock));
rw_lock_s_lock_spin(lock, pass, file_name, line);
@@ -258,86 +337,66 @@ rw_lock_s_lock_func(
/**********************************************************************
NOTE! Use the corresponding macro, not directly this function! Lock an
-rw-lock in shared mode for the current thread if the lock can be acquired
-immediately. */
+rw-lock in exclusive mode for the current thread if the lock can be
+obtained immediately. */
UNIV_INLINE
ibool
-rw_lock_s_lock_func_nowait(
+rw_lock_x_lock_func_nowait(
/*=======================*/
/* out: TRUE if success */
rw_lock_t* lock, /* in: pointer to rw-lock */
const char* file_name,/* in: file name where lock requested */
ulint line) /* in: line where requested */
{
- ibool success = FALSE;
-
- mutex_enter(rw_lock_get_mutex(lock));
+ os_thread_id_t curr_thread = os_thread_get_curr_id();
- if (lock->writer == RW_LOCK_NOT_LOCKED) {
- /* Set the shared lock by incrementing the reader count */
- lock->reader_count++;
+ ibool success;
-#ifdef UNIV_SYNC_DEBUG
- rw_lock_add_debug_info(lock, 0, RW_LOCK_SHARED, file_name,
- line);
-#endif
-
- lock->last_s_file_name = file_name;
- lock->last_s_line = line;
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+ success = os_compare_and_swap(&(lock->lock_word), X_LOCK_DECR, 0);
+#else
+ success = FALSE;
+ mutex_enter(&(lock->mutex));
+ if(lock->lock_word == X_LOCK_DECR) {
+ lock->lock_word = 0;
success = TRUE;
}
+ mutex_exit(&(lock->mutex));
- mutex_exit(rw_lock_get_mutex(lock));
-
- return(success);
-}
-
-/**********************************************************************
-NOTE! Use the corresponding macro, not directly this function! Lock an
-rw-lock in exclusive mode for the current thread if the lock can be
-obtained immediately. */
-UNIV_INLINE
-ibool
-rw_lock_x_lock_func_nowait(
-/*=======================*/
- /* out: TRUE if success */
- rw_lock_t* lock, /* in: pointer to rw-lock */
- const char* file_name,/* in: file name where lock requested */
- ulint line) /* in: line where requested */
-{
- ibool success = FALSE;
- os_thread_id_t curr_thread = os_thread_get_curr_id();
- mutex_enter(rw_lock_get_mutex(lock));
-
- if (UNIV_UNLIKELY(rw_lock_get_reader_count(lock) != 0)) {
- } else if (UNIV_LIKELY(rw_lock_get_writer(lock)
- == RW_LOCK_NOT_LOCKED)) {
- rw_lock_set_writer(lock, RW_LOCK_EX);
+#endif
+ if(success) {
lock->writer_thread = curr_thread;
lock->pass = 0;
-relock:
- lock->writer_count++;
-#ifdef UNIV_SYNC_DEBUG
- rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line);
-#endif
+ } else if (!(lock->pass) &&
+ os_thread_eq(lock->writer_thread, curr_thread)) {
+ /* Must verify pass first: otherwise another thread can
+ call move_ownership suddenly allowing recursive locks.
+ and after we have verified our thread_id matches
+ (though move_ownership has since changed it).*/
- lock->last_x_file_name = file_name;
- lock->last_x_line = line;
+ /* Relock: this lock_word modification is safe since no other
+ threads can modify (lock, unlock, or reserve) lock_word while
+ there is an exclusive writer and this is the writer thread. */
+ lock->lock_word -= X_LOCK_DECR;
- success = TRUE;
- } else if (rw_lock_get_writer(lock) == RW_LOCK_EX
- && lock->pass == 0
- && os_thread_eq(lock->writer_thread, curr_thread)) {
- goto relock;
+ ut_ad(((-lock->lock_word) % X_LOCK_DECR) == 0);
+
+ } else {
+ /* Failure */
+ return(FALSE);
}
+#ifdef UNIV_SYNC_DEBUG
+ rw_lock_add_debug_info(lock, 0, RW_LOCK_EX, file_name, line);
+#endif
- mutex_exit(rw_lock_get_mutex(lock));
+ lock->last_x_file_name = file_name;
+ lock->last_x_line = line;
ut_ad(rw_lock_validate(lock));
- return(success);
+ return(TRUE);
}
/**********************************************************************
@@ -353,39 +412,21 @@ rw_lock_s_unlock_func(
#endif
)
{
- mutex_t* mutex = &(lock->mutex);
- ibool sg = FALSE;
-
- /* Acquire the mutex protecting the rw-lock fields */
- mutex_enter(mutex);
-
- /* Reset the shared lock by decrementing the reader count */
-
- ut_a(lock->reader_count > 0);
- lock->reader_count--;
+ ut_ad((lock->lock_word % X_LOCK_DECR) != 0);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, pass, RW_LOCK_SHARED);
#endif
- /* If there may be waiters and this was the last s-lock,
- signal the object */
-
- if (UNIV_UNLIKELY(lock->waiters)
- && lock->reader_count == 0) {
- sg = TRUE;
+ /* Increment lock_word to indicate 1 less reader */
+ if(rw_lock_lock_word_incr(lock, 1) == 0) {
- rw_lock_set_waiters(lock, 0);
- }
-
- mutex_exit(mutex);
-
- if (UNIV_UNLIKELY(sg)) {
-#ifdef __WIN__
+ /* wait_ex waiter exists. It may not be asleep, but we signal
+ anyway. We do not wake other waiters, because they can't
+ exist without wait_ex waiter and wait_ex waiter goes first.*/
os_event_set(lock->wait_ex_event);
-#endif
- os_event_set(lock->event);
sync_array_object_signalled(sync_primary_wait_array);
+
}
ut_ad(rw_lock_validate(lock));
@@ -404,16 +445,15 @@ rw_lock_s_unlock_direct(
/*====================*/
rw_lock_t* lock) /* in: rw-lock */
{
- /* Reset the shared lock by decrementing the reader count */
-
- ut_ad(lock->reader_count > 0);
-
- lock->reader_count--;
+ ut_ad(lock->lock_word < X_LOCK_DECR);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, 0, RW_LOCK_SHARED);
#endif
+ /* Decrease reader count by incrementing lock_word */
+ lock->lock_word++;
+
ut_ad(!lock->waiters);
ut_ad(rw_lock_validate(lock));
#ifdef UNIV_SYNC_PERF_STAT
@@ -434,42 +474,32 @@ rw_lock_x_unlock_func(
#endif
)
{
- ibool sg = FALSE;
+ ut_ad((lock->lock_word % X_LOCK_DECR) == 0);
- /* Acquire the mutex protecting the rw-lock fields */
- mutex_enter(&(lock->mutex));
-
- /* Reset the exclusive lock if this thread no longer has an x-mode
- lock */
-
- ut_ad(lock->writer_count > 0);
-
- lock->writer_count--;
-
- if (lock->writer_count == 0) {
- rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
- }
+ /* Must reset writer_thread while we still have the lock.
+ If we are not the last unlocker, we correct it later in the function,
+ which is harmless since we still hold the lock. */
+ /* TODO: are there any risks of a thread id == -1 on any platform? */
+ os_thread_id_t local_writer_thread = lock->writer_thread;
+ lock->writer_thread = -1;
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, pass, RW_LOCK_EX);
#endif
- /* If there may be waiters, signal the lock */
- if (UNIV_UNLIKELY(lock->waiters)
- && lock->writer_count == 0) {
-
- sg = TRUE;
- rw_lock_set_waiters(lock, 0);
- }
-
- mutex_exit(&(lock->mutex));
+ if(rw_lock_lock_word_incr(lock, X_LOCK_DECR) == X_LOCK_DECR) {
+ /* Lock is now free. May have to signal read/write waiters.
+ We do not need to signal wait_ex waiters, since they cannot
+ exist when there is a writer. */
+ if(lock->waiters) {
+ rw_lock_set_waiters(lock, 0);
+ os_event_set(lock->event);
+ sync_array_object_signalled(sync_primary_wait_array);
+ }
- if (UNIV_UNLIKELY(sg)) {
-#ifdef __WIN__
- os_event_set(lock->wait_ex_event);
-#endif
- os_event_set(lock->event);
- sync_array_object_signalled(sync_primary_wait_array);
+ } else {
+ /* We still hold x-lock, so we correct writer_thread. */
+ lock->writer_thread = local_writer_thread;
}
ut_ad(rw_lock_validate(lock));
@@ -491,18 +521,14 @@ rw_lock_x_unlock_direct(
/* Reset the exclusive lock if this thread no longer has an x-mode
lock */
- ut_ad(lock->writer_count > 0);
-
- lock->writer_count--;
-
- if (lock->writer_count == 0) {
- rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
- }
+ ut_ad((lock->lock_word % X_LOCK_DECR) == 0);
#ifdef UNIV_SYNC_DEBUG
rw_lock_remove_debug_info(lock, 0, RW_LOCK_EX);
#endif
+ lock->lock_word += X_LOCK_DECR;
+
ut_ad(!lock->waiters);
ut_ad(rw_lock_validate(lock));
diff --git a/storage/innobase/include/sync0sync.h b/storage/innobase/include/sync0sync.h
index 6a61330f97e..cfc1c8d2c40 100644
--- a/storage/innobase/include/sync0sync.h
+++ b/storage/innobase/include/sync0sync.h
@@ -252,7 +252,7 @@ mutex_n_reserved(void);
NOT to be used outside this module except in debugging! Gets the value
of the lock word. */
UNIV_INLINE
-ulint
+byte
mutex_get_lock_word(
/*================*/
const mutex_t* mutex); /* in: mutex */
@@ -471,9 +471,13 @@ implementation of a mutual exclusion semaphore. */
struct mutex_struct {
os_event_t event; /* Used by sync0arr.c for the wait queue */
- ulint lock_word; /* This ulint is the target of the atomic
- test-and-set instruction in Win32 */
-#if !defined(_WIN32) || !defined(UNIV_CAN_USE_X86_ASSEMBLER)
+
+ byte lock_word; /* This byte is the target of the atomic
+ test-and-set instruction in Win32 and
+ x86 32/64 with GCC 4.1.0 or later version */
+#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
+#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
+#else
os_fast_mutex_t
os_fast_mutex; /* In other systems we use this OS mutex
in place of lock_word */
@@ -526,8 +530,7 @@ to 20 microseconds. */
/* The number of system calls made in this module. Intended for performance
monitoring. */
-extern ulint mutex_system_call_count;
-extern ulint mutex_exit_count;
+extern ib_longlong mutex_exit_count;
#ifdef UNIV_SYNC_DEBUG
/* Latching order checks start when this is set TRUE */
diff --git a/storage/innobase/include/sync0sync.ic b/storage/innobase/include/sync0sync.ic
index 908797f9729..ffe8794eca5 100644
--- a/storage/innobase/include/sync0sync.ic
+++ b/storage/innobase/include/sync0sync.ic
@@ -6,16 +6,6 @@ Mutex, the basic synchronization primitive
Created 9/5/1995 Heikki Tuuri
*******************************************************/
-#if defined(not_defined) && defined(__GNUC__) && defined(UNIV_INTEL_X86)
-/* %z0: Use the size of operand %0 which in our case is *m to determine
-instruction size, it should end up as xchgl. "1" in the input constraint,
-says that "in" has to go in the same place as "out".*/
-#define TAS(m, in, out) \
- asm volatile ("xchg%z0 %2, %0" \
- : "=g" (*(m)), "=r" (out) \
- : "1" (in)) /* Note: "1" here refers to "=r" (out) */
-#endif
-
/**********************************************************************
Sets the waiters field in a mutex. */
@@ -59,7 +49,7 @@ mutex_signal_object(
Performs an atomic test-and-set instruction to the lock_word field of a
mutex. */
UNIV_INLINE
-ulint
+byte
mutex_test_and_set(
/*===============*/
/* out: the previous value of lock_word: 0 or
@@ -67,18 +57,18 @@ mutex_test_and_set(
mutex_t* mutex) /* in: mutex */
{
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
- ulint res;
- ulint* lw; /* assembler code is used to ensure that
+ byte res;
+ byte* lw; /* assembler code is used to ensure that
lock_word is loaded from memory */
ut_ad(mutex);
- ut_ad(sizeof(ulint) == 4);
+ ut_ad(sizeof(byte) == 1);
lw = &(mutex->lock_word);
__asm MOV ECX, lw
__asm MOV EDX, 1
- __asm XCHG EDX, DWORD PTR [ECX]
- __asm MOV res, EDX
+ __asm XCHG DL, BYTE PTR [ECX]
+ __asm MOV res, DL
/* The fence below would prevent this thread from
reading the data structure protected by the mutex
@@ -98,12 +88,8 @@ mutex_test_and_set(
/* mutex_fence(); */
return(res);
-#elif defined(not_defined) && defined(__GNUC__) && defined(UNIV_INTEL_X86)
- ulint res;
-
- TAS(&mutex->lock_word, 1, res);
-
- return(res);
+#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
+ return __sync_lock_test_and_set(&(mutex->lock_word), 1);
#else
ibool ret;
@@ -117,7 +103,7 @@ mutex_test_and_set(
mutex->lock_word = 1;
}
- return(ret);
+ return((byte)ret);
#endif
}
@@ -131,7 +117,7 @@ mutex_reset_lock_word(
mutex_t* mutex) /* in: mutex */
{
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
- ulint* lw; /* assembler code is used to ensure that
+ byte* lw; /* assembler code is used to ensure that
lock_word is loaded from memory */
ut_ad(mutex);
@@ -139,11 +125,12 @@ mutex_reset_lock_word(
__asm MOV EDX, 0
__asm MOV ECX, lw
- __asm XCHG EDX, DWORD PTR [ECX]
-#elif defined(not_defined) && defined(__GNUC__) && defined(UNIV_INTEL_X86)
- ulint res;
-
- TAS(&mutex->lock_word, 0, res);
+ __asm XCHG DL, BYTE PTR [ECX]
+#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
+ /* In theory __sync_lock_release should be used to release the lock.
+ Unfortunately, it does not work properly alone. The workaround is
+ that more conservative __sync_lock_test_and_set is used instead. */
+ __sync_lock_test_and_set(&(mutex->lock_word), 0);
#else
mutex->lock_word = 0;
@@ -154,12 +141,12 @@ mutex_reset_lock_word(
/**********************************************************************
Gets the value of the lock word. */
UNIV_INLINE
-ulint
+byte
mutex_get_lock_word(
/*================*/
const mutex_t* mutex) /* in: mutex */
{
- const volatile ulint* ptr; /* declared volatile to ensure that
+ const volatile byte* ptr; /* declared volatile to ensure that
lock_word is loaded from memory */
ut_ad(mutex);
diff --git a/storage/innobase/include/univ.i b/storage/innobase/include/univ.i
index 8ab62e655ff..672a340ebca 100644
--- a/storage/innobase/include/univ.i
+++ b/storage/innobase/include/univ.i
@@ -116,6 +116,9 @@ by one. */
#define UNIV_SET_MEM_TO_ZERO
#endif
+/* Use malloc instead of innodb additional memory pool (great with tcmalloc) */
+#define UNIV_DISABLE_MEM_POOL
+
/*
#define UNIV_SQL_DEBUG
#define UNIV_LOG_DEBUG
diff --git a/storage/innobase/mem/mem0pool.c b/storage/innobase/mem/mem0pool.c
index 27da86a0309..d3b31747690 100644
--- a/storage/innobase/mem/mem0pool.c
+++ b/storage/innobase/mem/mem0pool.c
@@ -329,6 +329,9 @@ mem_area_alloc(
minus MEM_AREA_EXTRA_SIZE */
mem_pool_t* pool) /* in: memory pool */
{
+#ifdef UNIV_DISABLE_MEM_POOL
+ return malloc(size);
+#else /* UNIV_DISABLE_MEM_POOL */
mem_area_t* area;
ulint n;
ibool ret;
@@ -407,6 +410,7 @@ mem_area_alloc(
ut_2_exp(n) - MEM_AREA_EXTRA_SIZE);
return((void*)(MEM_AREA_EXTRA_SIZE + ((byte*)area)));
+#endif /* UNIV_DISABLE_MEM_POOL */
}
/************************************************************************
@@ -459,6 +463,9 @@ mem_area_free(
buffer */
mem_pool_t* pool) /* in: memory pool */
{
+#ifdef UNIV_DISABLE_MEM_POOL
+ free(ptr);
+#else /* UNIV_DISABLE_MEM_POOL */
mem_area_t* area;
mem_area_t* buddy;
void* new_ptr;
@@ -570,6 +577,7 @@ mem_area_free(
mutex_exit(&(pool->mutex));
ut_ad(mem_pool_validate(pool));
+#endif /* UNIV_DISABLE_MEM_POOL */
}
/************************************************************************
diff --git a/storage/innobase/row/row0sel.c b/storage/innobase/row/row0sel.c
index 6ff135e4f5a..05721014078 100644
--- a/storage/innobase/row/row0sel.c
+++ b/storage/innobase/row/row0sel.c
@@ -1248,7 +1248,7 @@ table_loop:
rw_lock_s_lock(&btr_search_latch);
search_latch_locked = TRUE;
- } else if (btr_search_latch.writer_is_wait_ex) {
+ } else if (rw_lock_get_writer(&btr_search_latch) == RW_LOCK_WAIT_EX) {
/* There is an x-latch request waiting: release the
s-latch for a moment; as an s-latch here is often
@@ -3327,7 +3327,7 @@ row_search_for_mysql(
/* PHASE 0: Release a possible s-latch we are holding on the
adaptive hash index latch if there is someone waiting behind */
- if (UNIV_UNLIKELY(btr_search_latch.writer != RW_LOCK_NOT_LOCKED)
+ if (UNIV_UNLIKELY(rw_lock_get_writer(&btr_search_latch) != RW_LOCK_NOT_LOCKED)
&& trx->has_search_latch) {
/* There is an x-latch request on the adaptive hash index:
diff --git a/storage/innobase/srv/srv0srv.c b/storage/innobase/srv/srv0srv.c
index 773b5d583e0..85efbf0ae6a 100644
--- a/storage/innobase/srv/srv0srv.c
+++ b/storage/innobase/srv/srv0srv.c
@@ -1834,6 +1834,16 @@ srv_export_innodb_status(void)
export_vars.innodb_buffer_pool_pages_misc = buf_pool->max_size
- UT_LIST_GET_LEN(buf_pool->LRU)
- UT_LIST_GET_LEN(buf_pool->free);
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+ export_vars.innodb_have_atomic_builtins = 1;
+#else
+ export_vars.innodb_have_atomic_builtins = 0;
+#endif
+#ifdef UNIV_DISABLE_MEM_POOL
+ export_vars.innodb_heap_enabled = 0;
+#else
+ export_vars.innodb_heap_enabled = 1;
+#endif
export_vars.innodb_page_size = UNIV_PAGE_SIZE;
export_vars.innodb_log_waits = srv_log_waits;
export_vars.innodb_os_log_written = srv_os_log_written;
diff --git a/storage/innobase/srv/srv0start.c b/storage/innobase/srv/srv0start.c
index 979d882307a..0c7ca29eb04 100644
--- a/storage/innobase/srv/srv0start.c
+++ b/storage/innobase/srv/srv0start.c
@@ -1062,6 +1062,16 @@ innobase_start_or_create_for_mysql(void)
return(DB_ERROR);
}
+#ifdef UNIV_DISABLE_MEM_POOL
+ fprintf(stderr,
+ "InnoDB: The InnoDB memory heap has been disabled.\n");
+#endif
+
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+ fprintf(stderr,
+ "InnoDB: Mutex and rw_lock use GCC atomic builtins.\n");
+#endif
+
/* Since InnoDB does not currently clean up all its internal data
structures in MySQL Embedded Server Library server_end(), we
print an error message if someone tries to start up InnoDB a
diff --git a/storage/innobase/sync/sync0arr.c b/storage/innobase/sync/sync0arr.c
index 154593a9035..ee6e901ab81 100644
--- a/storage/innobase/sync/sync0arr.c
+++ b/storage/innobase/sync/sync0arr.c
@@ -295,28 +295,25 @@ sync_array_validate(
}
/***********************************************************************
-Puts the cell event in reset state. */
+Returns the event that the thread owning the cell waits for. */
static
-ib_longlong
-sync_cell_event_reset(
-/*==================*/
- /* out: value of signal_count
- at the time of reset. */
- ulint type, /* in: lock type mutex/rw_lock */
- void* object) /* in: the rw_lock/mutex object */
+os_event_t
+sync_cell_get_event(
+/*================*/
+ sync_cell_t* cell) /* in: non-empty sync array cell */
{
+ ulint type = cell->request_type;
+
if (type == SYNC_MUTEX) {
- return(os_event_reset(((mutex_t *) object)->event));
-#ifdef __WIN__
+ return(((mutex_t *) cell->wait_object)->event);
} else if (type == RW_LOCK_WAIT_EX) {
- return(os_event_reset(
- ((rw_lock_t *) object)->wait_ex_event));
-#endif
- } else {
- return(os_event_reset(((rw_lock_t *) object)->event));
+ return(((rw_lock_t *) cell->wait_object)->wait_ex_event);
+ } else { /* RW_LOCK_SHARED and RW_LOCK_EX wait on the same event */
+ return(((rw_lock_t *) cell->wait_object)->event);
}
}
+
/**********************************************************************
Reserves a wait array cell for waiting for an object.
The event of the cell is reset to nonsignalled state. */
@@ -332,6 +329,7 @@ sync_array_reserve_cell(
ulint* index) /* out: index of the reserved cell */
{
sync_cell_t* cell;
+ os_event_t event;
ulint i;
ut_a(object);
@@ -370,8 +368,8 @@ sync_array_reserve_cell(
/* Make sure the event is reset and also store
the value of signal_count at which the event
was reset. */
- cell->signal_count = sync_cell_event_reset(type,
- object);
+ event = sync_cell_get_event(cell);
+ cell->signal_count = os_event_reset(event);
cell->reservation_time = time(NULL);
@@ -411,19 +409,7 @@ sync_array_wait_event(
ut_a(!cell->waiting);
ut_ad(os_thread_get_curr_id() == cell->thread);
- if (cell->request_type == SYNC_MUTEX) {
- event = ((mutex_t*) cell->wait_object)->event;
-#ifdef __WIN__
- /* On windows if the thread about to wait is the one which
- has set the state of the rw_lock to RW_LOCK_WAIT_EX, then
- it waits on a special event i.e.: wait_ex_event. */
- } else if (cell->request_type == RW_LOCK_WAIT_EX) {
- event = ((rw_lock_t*) cell->wait_object)->wait_ex_event;
-#endif
- } else {
- event = ((rw_lock_t*) cell->wait_object)->event;
- }
-
+ event = sync_cell_get_event(cell);
cell->waiting = TRUE;
#ifdef UNIV_SYNC_DEBUG
@@ -462,6 +448,7 @@ sync_array_cell_print(
mutex_t* mutex;
rw_lock_t* rwlock;
ulint type;
+ ulint writer;
type = cell->request_type;
@@ -491,9 +478,7 @@ sync_array_cell_print(
(ulong) mutex->waiters);
} else if (type == RW_LOCK_EX
-#ifdef __WIN__
|| type == RW_LOCK_WAIT_EX
-#endif
|| type == RW_LOCK_SHARED) {
fputs(type == RW_LOCK_EX ? "X-lock on" : "S-lock on", file);
@@ -504,22 +489,25 @@ sync_array_cell_print(
" RW-latch at %p created in file %s line %lu\n",
(void*) rwlock, rwlock->cfile_name,
(ulong) rwlock->cline);
- if (rwlock->writer != RW_LOCK_NOT_LOCKED) {
+ writer = rw_lock_get_writer(rwlock);
+ if (writer != RW_LOCK_NOT_LOCKED) {
fprintf(file,
"a writer (thread id %lu) has"
" reserved it in mode %s",
(ulong) os_thread_pf(rwlock->writer_thread),
- rwlock->writer == RW_LOCK_EX
+ writer == RW_LOCK_EX
? " exclusive\n"
: " wait exclusive\n");
}
fprintf(file,
- "number of readers %lu, waiters flag %lu\n"
+ "number of readers %lu, waiters flag %lu, "
+ "lock_word: %ld\n"
"Last time read locked in file %s line %lu\n"
"Last time write locked in file %s line %lu\n",
- (ulong) rwlock->reader_count,
+ (ulong) rw_lock_get_reader_count(rwlock),
(ulong) rwlock->waiters,
+ rwlock->lock_word,
rwlock->last_s_file_name,
(ulong) rwlock->last_s_line,
rwlock->last_x_file_name,
@@ -778,28 +766,30 @@ sync_arr_cell_can_wake_up(
return(TRUE);
}
- } else if (cell->request_type == RW_LOCK_EX
- || cell->request_type == RW_LOCK_WAIT_EX) {
+ } else if (cell->request_type == RW_LOCK_EX) {
lock = cell->wait_object;
- if (rw_lock_get_reader_count(lock) == 0
- && rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
+ /* X_LOCK_DECR is the unlocked state */
+ if (lock->lock_word == X_LOCK_DECR) {
return(TRUE);
}
- if (rw_lock_get_reader_count(lock) == 0
- && rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX
- && os_thread_eq(lock->writer_thread, cell->thread)) {
+ } else if (cell->request_type == RW_LOCK_WAIT_EX) {
+
+ lock = cell->wait_object;
+
+ /* lock_word == 0 means all readers have left */
+ if (lock->lock_word == 0) {
return(TRUE);
}
-
} else if (cell->request_type == RW_LOCK_SHARED) {
lock = cell->wait_object;
- if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
+ /* lock_word > 0 means no writer or reserved writer */
+ if (lock->lock_word > 0) {
return(TRUE);
}
@@ -844,11 +834,15 @@ sync_array_object_signalled(
/*========================*/
sync_array_t* arr) /* in: wait array */
{
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+ __sync_fetch_and_add(&(arr->sg_count),1);
+#else
sync_array_enter(arr);
arr->sg_count++;
sync_array_exit(arr);
+#endif
}
/**************************************************************************
@@ -868,6 +862,7 @@ sync_arr_wake_threads_if_sema_free(void)
sync_cell_t* cell;
ulint count;
ulint i;
+ os_event_t event;
sync_array_enter(arr);
@@ -877,36 +872,20 @@ sync_arr_wake_threads_if_sema_free(void)
while (count < arr->n_reserved) {
cell = sync_array_get_nth_cell(arr, i);
+ i++;
- if (cell->wait_object != NULL) {
-
+ if (cell->wait_object == NULL) {
+ continue;
+ }
count++;
if (sync_arr_cell_can_wake_up(cell)) {
- if (cell->request_type == SYNC_MUTEX) {
- mutex_t* mutex;
+ event = sync_cell_get_event(cell);
- mutex = cell->wait_object;
- os_event_set(mutex->event);
-#ifdef __WIN__
- } else if (cell->request_type
- == RW_LOCK_WAIT_EX) {
- rw_lock_t* lock;
-
- lock = cell->wait_object;
- os_event_set(lock->wait_ex_event);
-#endif
- } else {
- rw_lock_t* lock;
-
- lock = cell->wait_object;
- os_event_set(lock->event);
- }
- }
+ os_event_set(event);
}
- i++;
}
sync_array_exit(arr);
@@ -1026,4 +1005,3 @@ sync_array_print_info(
sync_array_exit(arr);
}
-
diff --git a/storage/innobase/sync/sync0rw.c b/storage/innobase/sync/sync0rw.c
index 367f019ce55..6c9cb076cbe 100644
--- a/storage/innobase/sync/sync0rw.c
+++ b/storage/innobase/sync/sync0rw.c
@@ -15,35 +15,111 @@ Created 9/11/1995 Heikki Tuuri
#include "mem0mem.h"
#include "srv0srv.h"
-/* number of system calls made during shared latching */
-ulint rw_s_system_call_count = 0;
+/*
+ IMPLEMENTATION OF THE RW_LOCK
+ =============================
+The status of a rw_lock is held in lock_word. The initial value of lock_word is
+X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR
+for each x-lock. This describes the lock state for each value of lock_word:
+
+lock_word == X_LOCK_DECR: Unlocked.
+0 < lock_word < X_LOCK_DECR: Read locked, no waiting writers.
+ (X_LOCK_DECR - lock_word) is the
+ number of readers that hold the lock.
+lock_word == 0: Write locked
+-X_LOCK_DECR < lock_word < 0: Read locked, with a waiting writer.
+ (-lock_word) is the number of readers
+ that hold the lock.
+lock_word <= -X_LOCK_DECR: Recursively write locked. lock_word has been
+ decremented by X_LOCK_DECR once for each lock,
+ so the number of locks is:
+ ((-lock_word) / X_LOCK_DECR) + 1
+When lock_word <= -X_LOCK_DECR, we also know that lock_word % X_LOCK_DECR == 0:
+other values of lock_word are invalid.
+
+The lock_word is always read and updated atomically and consistently, so that
+it always represents the state of the lock, and the state of the lock changes
+with a single atomic operation. This lock_word holds all of the information
+that a thread needs in order to determine if it is eligible to gain the lock
+or if it must spin or sleep. The one exception to this is that writer_thread
+must be verified before recursive write locks: to solve this scenario, we make
+writer_thread readable by all threads, but only writeable by the x-lock holder.
+
+The other members of the lock obey the following rules to remain consistent:
+
+pass: This is only set to 1 to prevent recursive x-locks. It must
+ be set as specified by x_lock caller after the lock_word
+ indicates that the thread holds the lock, but before that
+ thread resumes execution. It must be reset to 0 during the
+ final x_unlock, but before the lock_word status is updated.
+ When an x_lock or move_ownership call wishes to change
+ pass, it must first update the writer_thread appropriately.
+writer_thread: Must be set to the writers thread_id after the lock_word
+ indicates that the thread holds the lock, but before that
+ thread resumes execution. It must be reset to -1 during the
+ final x_unlock, but before the lock_word status is updated.
+ This ensures that when the lock_word indicates that an x_lock
+ is held, the only legitimate values for writer_thread are -1
+ (x_lock function hasn't completed) or the writer's thread_id.
+waiters: May be set to 1 anytime, but to avoid unnecessary wake-up
+ signals, it should only be set to 1 when there are threads
+ waiting on event. Must be 1 when a writer starts waiting to
+ ensure the current x-locking thread sends a wake-up signal
+ during unlock. May only be reset to 0 immediately before a
+ a wake-up signal is sent to event.
+event: Threads wait on event for read or writer lock when another
+ thread has an x-lock or an x-lock reservation (wait_ex). A
+ thread may only wait on event after performing the following
+ actions in order:
+ (1) Record the counter value of event (with os_event_reset).
+ (2) Set waiters to 1.
+ (3) Verify lock_word <= 0.
+ (1) must come before (2) to ensure signal is not missed.
+ (2) must come before (3) to ensure a signal is sent.
+ These restrictions force the above ordering.
+ Immediately before sending the wake-up signal, we should:
+ (1) Verify lock_word == X_LOCK_DECR (unlocked)
+ (2) Reset waiters to 0.
+wait_ex_event: A thread may only wait on the wait_ex_event after it has
+ performed the following actions in order:
+ (1) Decrement lock_word by X_LOCK_DECR.
+ (2) Record counter value of wait_ex_event (os_event_reset,
+ called from sync_array_reserve_cell).
+ (3) Verify that lock_word < 0.
+ (1) must come first to ensures no other threads become reader
+ or next writer, and notifies unlocker that signal must be sent.
+ (2) must come before (3) to ensure the signal is not missed.
+ These restrictions force the above ordering.
+ Immediately before sending the wake-up signal, we should:
+ Verify lock_word == 0 (waiting thread holds x_lock)
+*/
+
/* number of spin waits on rw-latches,
resulted during shared (read) locks */
-ulint rw_s_spin_wait_count = 0;
+ib_longlong rw_s_spin_wait_count = 0;
+ib_longlong rw_s_spin_round_count = 0;
/* number of OS waits on rw-latches,
resulted during shared (read) locks */
-ulint rw_s_os_wait_count = 0;
+ib_longlong rw_s_os_wait_count = 0;
/* number of unlocks (that unlock shared locks),
set only when UNIV_SYNC_PERF_STAT is defined */
-ulint rw_s_exit_count = 0;
-
-/* number of system calls made during exclusive latching */
-ulint rw_x_system_call_count = 0;
+ib_longlong rw_s_exit_count = 0;
/* number of spin waits on rw-latches,
resulted during exclusive (write) locks */
-ulint rw_x_spin_wait_count = 0;
+ib_longlong rw_x_spin_wait_count = 0;
+ib_longlong rw_x_spin_round_count = 0;
/* number of OS waits on rw-latches,
resulted during exclusive (write) locks */
-ulint rw_x_os_wait_count = 0;
+ib_longlong rw_x_os_wait_count = 0;
/* number of unlocks (that unlock exclusive locks),
set only when UNIV_SYNC_PERF_STAT is defined */
-ulint rw_x_exit_count = 0;
+ib_longlong rw_x_exit_count = 0;
/* The global list of rw-locks */
rw_lock_list_t rw_lock_list;
@@ -119,6 +195,7 @@ rw_lock_create_func(
/* If this is the very first time a synchronization object is
created, then the following call initializes the sync system. */
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_create(rw_lock_get_mutex(lock), SYNC_NO_ORDER_CHECK);
lock->mutex.cfile_name = cfile_name;
@@ -129,12 +206,12 @@ rw_lock_create_func(
lock->mutex.mutex_type = 1;
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
- rw_lock_set_waiters(lock, 0);
- rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
- lock->writer_count = 0;
- rw_lock_set_reader_count(lock, 0);
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
- lock->writer_is_wait_ex = FALSE;
+ lock->lock_word = X_LOCK_DECR;
+ rw_lock_set_waiters(lock, 0);
+ lock->writer_thread = -1;
+ lock->pass = 0;
#ifdef UNIV_SYNC_DEBUG
UT_LIST_INIT(lock->debug_list);
@@ -147,15 +224,13 @@ rw_lock_create_func(
lock->cfile_name = cfile_name;
lock->cline = (unsigned int) cline;
+ lock->count_os_wait = 0;
lock->last_s_file_name = "not yet reserved";
lock->last_x_file_name = "not yet reserved";
lock->last_s_line = 0;
lock->last_x_line = 0;
lock->event = os_event_create(NULL);
-
-#ifdef __WIN__
lock->wait_ex_event = os_event_create(NULL);
-#endif
mutex_enter(&rw_lock_list_mutex);
@@ -180,20 +255,19 @@ rw_lock_free(
rw_lock_t* lock) /* in: rw-lock */
{
ut_ad(rw_lock_validate(lock));
- ut_a(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED);
+ ut_a(lock->lock_word == X_LOCK_DECR);
ut_a(rw_lock_get_waiters(lock) == 0);
- ut_a(rw_lock_get_reader_count(lock) == 0);
lock->magic_n = 0;
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_free(rw_lock_get_mutex(lock));
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
mutex_enter(&rw_lock_list_mutex);
os_event_free(lock->event);
-#ifdef __WIN__
os_event_free(lock->wait_ex_event);
-#endif
if (UT_LIST_GET_PREV(list, lock)) {
ut_a(UT_LIST_GET_PREV(list, lock)->magic_n == RW_LOCK_MAGIC_N);
@@ -219,19 +293,12 @@ rw_lock_validate(
{
ut_a(lock);
- mutex_enter(rw_lock_get_mutex(lock));
+ ulint waiters = rw_lock_get_waiters(lock);
+ lint lock_word = lock->lock_word;
ut_a(lock->magic_n == RW_LOCK_MAGIC_N);
- ut_a((rw_lock_get_reader_count(lock) == 0)
- || (rw_lock_get_writer(lock) != RW_LOCK_EX));
- ut_a((rw_lock_get_writer(lock) == RW_LOCK_EX)
- || (rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX)
- || (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED));
- ut_a((rw_lock_get_waiters(lock) == 0)
- || (rw_lock_get_waiters(lock) == 1));
- ut_a((lock->writer != RW_LOCK_EX) || (lock->writer_count > 0));
-
- mutex_exit(rw_lock_get_mutex(lock));
+ ut_a(waiters == 0 || waiters == 1);
+ ut_a(lock_word > -X_LOCK_DECR ||(-lock_word) % X_LOCK_DECR == 0);
return(TRUE);
}
@@ -253,18 +320,15 @@ rw_lock_s_lock_spin(
ulint line) /* in: line where requested */
{
ulint index; /* index of the reserved wait cell */
- ulint i; /* spin round count */
+ ulint i = 0; /* spin round count */
ut_ad(rw_lock_validate(lock));
+ rw_s_spin_wait_count++; /* Count calls to this function */
lock_loop:
- rw_s_spin_wait_count++;
/* Spin waiting for the writer field to become free */
- i = 0;
-
- while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED
- && i < SYNC_SPIN_ROUNDS) {
+ while (i < SYNC_SPIN_ROUNDS && lock->lock_word <= 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
@@ -285,28 +349,32 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline, (ulong) i);
}
- mutex_enter(rw_lock_get_mutex(lock));
-
/* We try once again to obtain the lock */
-
if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
- mutex_exit(rw_lock_get_mutex(lock));
+ rw_s_spin_round_count += i;
return; /* Success */
} else {
- /* If we get here, locking did not succeed, we may
- suspend the thread to wait in the wait array */
- rw_s_system_call_count++;
+ if (i < SYNC_SPIN_ROUNDS) {
+ goto lock_loop;
+ }
+
+ rw_s_spin_round_count += i;
sync_array_reserve_cell(sync_primary_wait_array,
lock, RW_LOCK_SHARED,
file_name, line,
&index);
+ /* Set waiters before checking lock_word to ensure wake-up
+ signal is sent. This may lead to some unnecessary signals. */
rw_lock_set_waiters(lock, 1);
- mutex_exit(rw_lock_get_mutex(lock));
+ if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
+ sync_array_free_cell(sync_primary_wait_array, index);
+ return; /* Success */
+ }
if (srv_print_latch_waits) {
fprintf(stderr,
@@ -317,11 +385,13 @@ lock_loop:
(ulong) lock->cline);
}
- rw_s_system_call_count++;
+ /* these stats may not be accurate */
+ lock->count_os_wait++;
rw_s_os_wait_count++;
sync_array_wait_event(sync_primary_wait_array, index);
+ i = 0;
goto lock_loop;
}
}
@@ -343,113 +413,149 @@ rw_lock_x_lock_move_ownership(
{
ut_ad(rw_lock_is_locked(lock, RW_LOCK_EX));
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+ os_thread_id_t local_writer_thread = lock->writer_thread;
+ os_thread_id_t new_writer_thread = os_thread_get_curr_id();
+ while (TRUE) {
+ if (local_writer_thread != -1) {
+ if(os_compare_and_swap(
+ &(lock->writer_thread),
+ local_writer_thread,
+ new_writer_thread)) {
+ break;
+ }
+ }
+ local_writer_thread = lock->writer_thread;
+ }
+ lock->pass = 0;
+#else /* HAVE_GCC_ATOMIC_BUILTINS */
mutex_enter(&(lock->mutex));
-
lock->writer_thread = os_thread_get_curr_id();
-
lock->pass = 0;
-
mutex_exit(&(lock->mutex));
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
}
/**********************************************************************
-Low-level function for acquiring an exclusive lock. */
+Function for the next writer to call. Waits for readers to exit.
+The caller must have already decremented lock_word by X_LOCK_DECR.*/
UNIV_INLINE
-ulint
-rw_lock_x_lock_low(
-/*===============*/
- /* out: RW_LOCK_NOT_LOCKED if did
- not succeed, RW_LOCK_EX if success,
- RW_LOCK_WAIT_EX, if got wait reservation */
+void
+rw_lock_x_lock_wait(
+/*================*/
rw_lock_t* lock, /* in: pointer to rw-lock */
+#ifdef UNIV_SYNC_DEBUG
ulint pass, /* in: pass value; != 0, if the lock will
be passed to another thread to unlock */
+#endif
const char* file_name,/* in: file name where lock requested */
ulint line) /* in: line where requested */
{
- ut_ad(mutex_own(rw_lock_get_mutex(lock)));
-
- if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
+ ulint index;
+ ulint i = 0;
- if (rw_lock_get_reader_count(lock) == 0) {
+ ut_ad(lock->lock_word <= 0);
- rw_lock_set_writer(lock, RW_LOCK_EX);
- lock->writer_thread = os_thread_get_curr_id();
- lock->writer_count++;
- lock->pass = pass;
+ while (lock->lock_word < 0) {
+ if (srv_spin_wait_delay) {
+ ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
+ }
+ if(i < SYNC_SPIN_ROUNDS) {
+ i++;
+ continue;
+ }
-#ifdef UNIV_SYNC_DEBUG
- rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
- file_name, line);
-#endif
- lock->last_x_file_name = file_name;
- lock->last_x_line = (unsigned int) line;
+ /* If there is still a reader, then go to sleep.*/
+ rw_x_spin_round_count += i;
+ i = 0;
+ sync_array_reserve_cell(sync_primary_wait_array,
+ lock,
+ RW_LOCK_WAIT_EX,
+ file_name, line,
+ &index);
+ /* Check lock_word to ensure wake-up isn't missed.*/
+ if(lock->lock_word < 0) {
- /* Locking succeeded, we may return */
- return(RW_LOCK_EX);
- } else {
- /* There are readers, we have to wait */
- rw_lock_set_writer(lock, RW_LOCK_WAIT_EX);
- lock->writer_thread = os_thread_get_curr_id();
- lock->pass = pass;
- lock->writer_is_wait_ex = TRUE;
+ /* these stats may not be accurate */
+ lock->count_os_wait++;
+ rw_x_os_wait_count++;
+ /* Add debug info as it is needed to detect possible
+ deadlock. We must add info for WAIT_EX thread for
+ deadlock detection to work properly. */
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_WAIT_EX,
file_name, line);
#endif
- return(RW_LOCK_WAIT_EX);
- }
-
- } else if ((rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX)
- && os_thread_eq(lock->writer_thread,
- os_thread_get_curr_id())) {
-
- if (rw_lock_get_reader_count(lock) == 0) {
-
- rw_lock_set_writer(lock, RW_LOCK_EX);
- lock->writer_count++;
- lock->pass = pass;
- lock->writer_is_wait_ex = FALSE;
-
+ sync_array_wait_event(sync_primary_wait_array,
+ index);
#ifdef UNIV_SYNC_DEBUG
- rw_lock_remove_debug_info(lock, pass, RW_LOCK_WAIT_EX);
- rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
- file_name, line);
+ rw_lock_remove_debug_info(lock, pass,
+ RW_LOCK_WAIT_EX);
#endif
-
- lock->last_x_file_name = file_name;
- lock->last_x_line = (unsigned int) line;
-
- /* Locking succeeded, we may return */
- return(RW_LOCK_EX);
+ /* It is possible to wake when lock_word < 0.
+ We must pass the while-loop check to proceed.*/
+ } else {
+ sync_array_free_cell(sync_primary_wait_array,
+ index);
}
+ }
+ rw_x_spin_round_count += i;
+}
- return(RW_LOCK_WAIT_EX);
-
- } else if ((rw_lock_get_writer(lock) == RW_LOCK_EX)
- && os_thread_eq(lock->writer_thread,
- os_thread_get_curr_id())
- && (lock->pass == 0)
- && (pass == 0)) {
+/**********************************************************************
+Low-level function for acquiring an exclusive lock. */
+UNIV_INLINE
+ibool
+rw_lock_x_lock_low(
+/*===============*/
+ /* out: RW_LOCK_NOT_LOCKED if did
+ not succeed, RW_LOCK_EX if success. */
+ rw_lock_t* lock, /* in: pointer to rw-lock */
+ ulint pass, /* in: pass value; != 0, if the lock will
+ be passed to another thread to unlock */
+ const char* file_name,/* in: file name where lock requested */
+ ulint line) /* in: line where requested */
+{
+ os_thread_id_t curr_thread = os_thread_get_curr_id();
+ ut_ad(curr_thread != -1); /* We use -1 as the unlocked value. */
- lock->writer_count++;
+ if(rw_lock_lock_word_decr(lock, X_LOCK_DECR)) {
+ ut_ad(lock->writer_thread == -1);
+ /* Decrement occurred: we are writer or next-writer. */
+ lock->writer_thread = curr_thread;
+ lock->pass = pass;
+ rw_lock_x_lock_wait(lock,
#ifdef UNIV_SYNC_DEBUG
- rw_lock_add_debug_info(lock, pass, RW_LOCK_EX, file_name,
- line);
+ pass,
#endif
+ file_name, line);
- lock->last_x_file_name = file_name;
- lock->last_x_line = (unsigned int) line;
-
- /* Locking succeeded, we may return */
- return(RW_LOCK_EX);
+ } else {
+ /* Decrement failed: relock or failed lock */
+ /* Must verify pass first: otherwise another thread can
+ call move_ownership suddenly allowing recursive locks.
+ and after we have verified our thread_id matches
+ (though move_ownership has since changed it).*/
+ if(!pass && !(lock->pass) &&
+ os_thread_eq(lock->writer_thread, curr_thread)) {
+ /* Relock */
+ lock->lock_word -= X_LOCK_DECR;
+ } else {
+ /* Another thread locked before us */
+ return(FALSE);
+ }
}
+#ifdef UNIV_SYNC_DEBUG
+ rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
+ file_name, line);
+#endif
+ lock->last_x_file_name = file_name;
+ lock->last_x_line = (unsigned int) line;
- /* Locking did not succeed */
- return(RW_LOCK_NOT_LOCKED);
+ return(TRUE);
}
/**********************************************************************
@@ -472,47 +578,30 @@ rw_lock_x_lock_func(
ulint line) /* in: line where requested */
{
ulint index; /* index of the reserved wait cell */
- ulint state; /* lock state acquired */
ulint i; /* spin round count */
+ ibool spinning = FALSE;
ut_ad(rw_lock_validate(lock));
-lock_loop:
- /* Acquire the mutex protecting the rw-lock fields */
- mutex_enter_fast(&(lock->mutex));
-
- state = rw_lock_x_lock_low(lock, pass, file_name, line);
+ i = 0;
- mutex_exit(&(lock->mutex));
+lock_loop:
- if (state == RW_LOCK_EX) {
+ if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
+ rw_x_spin_round_count += i;
return; /* Locking succeeded */
- } else if (state == RW_LOCK_NOT_LOCKED) {
-
- /* Spin waiting for the writer field to become free */
- i = 0;
-
- while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED
- && i < SYNC_SPIN_ROUNDS) {
- if (srv_spin_wait_delay) {
- ut_delay(ut_rnd_interval(0,
- srv_spin_wait_delay));
- }
+ } else {
- i++;
- }
- if (i == SYNC_SPIN_ROUNDS) {
- os_thread_yield();
+ if (!spinning) {
+ spinning = TRUE;
+ rw_x_spin_wait_count++;
}
- } else if (state == RW_LOCK_WAIT_EX) {
- /* Spin waiting for the reader count field to become zero */
- i = 0;
-
- while (rw_lock_get_reader_count(lock) != 0
- && i < SYNC_SPIN_ROUNDS) {
+ /* Spin waiting for the lock_word to become free */
+ while (i < SYNC_SPIN_ROUNDS
+ && lock->lock_word <= 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0,
srv_spin_wait_delay));
@@ -522,12 +611,13 @@ lock_loop:
}
if (i == SYNC_SPIN_ROUNDS) {
os_thread_yield();
+ } else {
+ goto lock_loop;
}
- } else {
- i = 0; /* Eliminate a compiler warning */
- ut_error;
}
+ rw_x_spin_round_count += i;
+
if (srv_print_latch_waits) {
fprintf(stderr,
"Thread %lu spin wait rw-x-lock at %p"
@@ -536,39 +626,20 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline, (ulong) i);
}
- rw_x_spin_wait_count++;
-
- /* We try once again to obtain the lock. Acquire the mutex protecting
- the rw-lock fields */
-
- mutex_enter(rw_lock_get_mutex(lock));
-
- state = rw_lock_x_lock_low(lock, pass, file_name, line);
-
- if (state == RW_LOCK_EX) {
- mutex_exit(rw_lock_get_mutex(lock));
-
- return; /* Locking succeeded */
- }
-
- rw_x_system_call_count++;
-
sync_array_reserve_cell(sync_primary_wait_array,
lock,
-#ifdef __WIN__
- /* On windows RW_LOCK_WAIT_EX signifies
- that this thread should wait on the
- special wait_ex_event. */
- (state == RW_LOCK_WAIT_EX)
- ? RW_LOCK_WAIT_EX :
-#endif
RW_LOCK_EX,
file_name, line,
&index);
+ /* Waiters must be set before checking lock_word, to ensure signal
+ is sent. This could lead to a few unnecessary wake-up signals. */
rw_lock_set_waiters(lock, 1);
- mutex_exit(rw_lock_get_mutex(lock));
+ if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
+ sync_array_free_cell(sync_primary_wait_array, index);
+ return; /* Locking succeeded */
+ }
if (srv_print_latch_waits) {
fprintf(stderr,
@@ -578,11 +649,13 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline);
}
- rw_x_system_call_count++;
+ /* these stats may not be accurate */
+ lock->count_os_wait++;
rw_x_os_wait_count++;
sync_array_wait_event(sync_primary_wait_array, index);
+ i = 0;
goto lock_loop;
}
@@ -730,7 +803,7 @@ rw_lock_own(
ut_ad(lock);
ut_ad(rw_lock_validate(lock));
- mutex_enter(&(lock->mutex));
+ rw_lock_debug_mutex_enter();
info = UT_LIST_GET_FIRST(lock->debug_list);
@@ -740,7 +813,7 @@ rw_lock_own(
&& (info->pass == 0)
&& (info->lock_type == lock_type)) {
- mutex_exit(&(lock->mutex));
+ rw_lock_debug_mutex_exit();
/* Found! */
return(TRUE);
@@ -748,7 +821,7 @@ rw_lock_own(
info = UT_LIST_GET_NEXT(list, info);
}
- mutex_exit(&(lock->mutex));
+ rw_lock_debug_mutex_exit();
return(FALSE);
}
@@ -770,22 +843,18 @@ rw_lock_is_locked(
ut_ad(lock);
ut_ad(rw_lock_validate(lock));
- mutex_enter(&(lock->mutex));
-
if (lock_type == RW_LOCK_SHARED) {
- if (lock->reader_count > 0) {
+ if (rw_lock_get_reader_count(lock) > 0) {
ret = TRUE;
}
} else if (lock_type == RW_LOCK_EX) {
- if (lock->writer == RW_LOCK_EX) {
+ if (rw_lock_get_writer(lock) == RW_LOCK_EX) {
ret = TRUE;
}
} else {
ut_error;
}
- mutex_exit(&(lock->mutex));
-
return(ret);
}
@@ -814,11 +883,10 @@ rw_lock_list_print_info(
count++;
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(&(lock->mutex));
-
- if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
- || (rw_lock_get_reader_count(lock) != 0)
- || (rw_lock_get_waiters(lock) != 0)) {
+#endif
+ if (lock->lock_word != X_LOCK_DECR) {
fprintf(file, "RW-LOCK: %p ", (void*) lock);
@@ -834,8 +902,10 @@ rw_lock_list_print_info(
info = UT_LIST_GET_NEXT(list, info);
}
}
-
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
+#endif
+
lock = UT_LIST_GET_NEXT(list, lock);
}
@@ -858,9 +928,10 @@ rw_lock_print(
"RW-LATCH INFO\n"
"RW-LATCH: %p ", (void*) lock);
- if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
- || (rw_lock_get_reader_count(lock) != 0)
- || (rw_lock_get_waiters(lock) != 0)) {
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
+ mutex_enter(&(lock->mutex));
+#endif
+ if (lock->lock_word != X_LOCK_DECR) {
if (rw_lock_get_waiters(lock)) {
fputs(" Waiters for the lock exist\n", stderr);
@@ -874,6 +945,9 @@ rw_lock_print(
info = UT_LIST_GET_NEXT(list, info);
}
}
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
+ mutex_exit(&(lock->mutex));
+#endif
}
/*************************************************************************
@@ -922,14 +996,11 @@ rw_lock_n_locked(void)
lock = UT_LIST_GET_FIRST(rw_lock_list);
while (lock != NULL) {
- mutex_enter(rw_lock_get_mutex(lock));
- if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
- || (rw_lock_get_reader_count(lock) != 0)) {
+ if (lock->lock_word != X_LOCK_DECR) {
count++;
}
- mutex_exit(rw_lock_get_mutex(lock));
lock = UT_LIST_GET_NEXT(list, lock);
}
diff --git a/storage/innobase/sync/sync0sync.c b/storage/innobase/sync/sync0sync.c
index 944fd2a97fc..4176143d679 100644
--- a/storage/innobase/sync/sync0sync.c
+++ b/storage/innobase/sync/sync0sync.c
@@ -138,18 +138,13 @@ Therefore, this thread is guaranteed to catch the os_set_event()
signalled unconditionally at the release of the lock.
Q.E.D. */
-/* The number of system calls made in this module. Intended for performance
-monitoring. */
-
-ulint mutex_system_call_count = 0;
-
/* Number of spin waits on mutexes: for performance monitoring */
/* round=one iteration of a spin loop */
-ulint mutex_spin_round_count = 0;
-ulint mutex_spin_wait_count = 0;
-ulint mutex_os_wait_count = 0;
-ulint mutex_exit_count = 0;
+ib_longlong mutex_spin_round_count = 0;
+ib_longlong mutex_spin_wait_count = 0;
+ib_longlong mutex_os_wait_count = 0;
+ib_longlong mutex_exit_count = 0;
/* The global array of wait cells for implementation of the database's own
mutexes and read-write locks */
@@ -243,6 +238,8 @@ mutex_create_func(
{
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
mutex_reset_lock_word(mutex);
+#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
+ mutex_reset_lock_word(mutex);
#else
os_fast_mutex_init(&(mutex->os_fast_mutex));
mutex->lock_word = 0;
@@ -333,7 +330,9 @@ mutex_free(
os_event_free(mutex->event);
-#if !defined(_WIN32) || !defined(UNIV_CAN_USE_X86_ASSEMBLER)
+#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
+#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
+#else
os_fast_mutex_free(&(mutex->os_fast_mutex));
#endif
/* If we free the mutex protecting the mutex list (freeing is
@@ -450,6 +449,12 @@ mutex_spin_wait(
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
ut_ad(mutex);
+ /* This update is not thread safe, but we don't mind if the count
+ isn't exact. Moved out of ifdef that follows because we are willing
+ to sacrifice the cost of counting this as the data is valuable.
+ Count the number of calls to mutex_spin_wait. */
+ mutex_spin_wait_count++;
+
mutex_loop:
i = 0;
@@ -462,7 +467,6 @@ mutex_loop:
spin_loop:
#if defined UNIV_DEBUG && !defined UNIV_HOTBACKUP
- mutex_spin_wait_count++;
mutex->count_spin_loop++;
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
@@ -527,8 +531,6 @@ spin_loop:
sync_array_reserve_cell(sync_primary_wait_array, mutex,
SYNC_MUTEX, file_name, line, &index);
- mutex_system_call_count++;
-
/* The memory order of the array reservation and the change in the
waiters field is important: when we suspend a thread, we first
reserve the cell and then set waiters field to 1. When threads are
@@ -575,7 +577,6 @@ spin_loop:
mutex->cfile_name, (ulong) mutex->cline, (ulong) i);
#endif
- mutex_system_call_count++;
mutex_os_wait_count++;
#ifndef UNIV_HOTBACKUP
@@ -1377,21 +1378,31 @@ sync_print_wait_info(
FILE* file) /* in: file where to print */
{
#ifdef UNIV_SYNC_DEBUG
- fprintf(file, "Mutex exits %lu, rws exits %lu, rwx exits %lu\n",
+ fprintf(file, "Mutex exits %llu, rws exits %llu, rwx exits %llu\n",
mutex_exit_count, rw_s_exit_count, rw_x_exit_count);
#endif
fprintf(file,
- "Mutex spin waits %lu, rounds %lu, OS waits %lu\n"
- "RW-shared spins %lu, OS waits %lu;"
- " RW-excl spins %lu, OS waits %lu\n",
- (ulong) mutex_spin_wait_count,
- (ulong) mutex_spin_round_count,
- (ulong) mutex_os_wait_count,
- (ulong) rw_s_spin_wait_count,
- (ulong) rw_s_os_wait_count,
- (ulong) rw_x_spin_wait_count,
- (ulong) rw_x_os_wait_count);
+ "Mutex spin waits %llu, rounds %llu, OS waits %llu\n"
+ "RW-shared spins %llu, OS waits %llu;"
+ " RW-excl spins %llu, OS waits %llu\n",
+ mutex_spin_wait_count,
+ mutex_spin_round_count,
+ mutex_os_wait_count,
+ rw_s_spin_wait_count,
+ rw_s_os_wait_count,
+ rw_x_spin_wait_count,
+ rw_x_os_wait_count);
+
+ fprintf(file,
+ "Spin rounds per wait: %.2f mutex, %.2f RW-shared, "
+ "%.2f RW-excl\n",
+ (double) mutex_spin_round_count /
+ (mutex_spin_wait_count ? mutex_spin_wait_count : 1),
+ (double) rw_s_spin_round_count /
+ (rw_s_spin_wait_count ? rw_s_spin_wait_count : 1),
+ (double) rw_x_spin_round_count /
+ (rw_x_spin_wait_count ? rw_x_spin_wait_count : 1));
}
/***********************************************************************