summaryrefslogtreecommitdiff
path: root/storage/innobase/sync
diff options
context:
space:
mode:
authorMikael Ronstrom <mikael@mysql.com>2008-10-15 20:54:18 +0200
committerMikael Ronstrom <mikael@mysql.com>2008-10-15 20:54:18 +0200
commit1732095b7dc4026a0bb25d6b612d15c3f3ff28b7 (patch)
tree0e544d3d36c291cd92de5c60cecaeaba75e39f4c /storage/innobase/sync
parent9daa56fd5ce3ccd33c32b5a505ac1d2b2c437460 (diff)
downloadmariadb-git-1732095b7dc4026a0bb25d6b612d15c3f3ff28b7.tar.gz
Google SMP patch
Diffstat (limited to 'storage/innobase/sync')
-rw-r--r--storage/innobase/sync/sync0arr.c114
-rw-r--r--storage/innobase/sync/sync0rw.c471
-rw-r--r--storage/innobase/sync/sync0sync.c61
3 files changed, 353 insertions, 293 deletions
diff --git a/storage/innobase/sync/sync0arr.c b/storage/innobase/sync/sync0arr.c
index 154593a9035..ee6e901ab81 100644
--- a/storage/innobase/sync/sync0arr.c
+++ b/storage/innobase/sync/sync0arr.c
@@ -295,28 +295,25 @@ sync_array_validate(
}
/***********************************************************************
-Puts the cell event in reset state. */
+Returns the event that the thread owning the cell waits for. */
static
-ib_longlong
-sync_cell_event_reset(
-/*==================*/
- /* out: value of signal_count
- at the time of reset. */
- ulint type, /* in: lock type mutex/rw_lock */
- void* object) /* in: the rw_lock/mutex object */
+os_event_t
+sync_cell_get_event(
+/*================*/
+ sync_cell_t* cell) /* in: non-empty sync array cell */
{
+ ulint type = cell->request_type;
+
if (type == SYNC_MUTEX) {
- return(os_event_reset(((mutex_t *) object)->event));
-#ifdef __WIN__
+ return(((mutex_t *) cell->wait_object)->event);
} else if (type == RW_LOCK_WAIT_EX) {
- return(os_event_reset(
- ((rw_lock_t *) object)->wait_ex_event));
-#endif
- } else {
- return(os_event_reset(((rw_lock_t *) object)->event));
+ return(((rw_lock_t *) cell->wait_object)->wait_ex_event);
+ } else { /* RW_LOCK_SHARED and RW_LOCK_EX wait on the same event */
+ return(((rw_lock_t *) cell->wait_object)->event);
}
}
+
/**********************************************************************
Reserves a wait array cell for waiting for an object.
The event of the cell is reset to nonsignalled state. */
@@ -332,6 +329,7 @@ sync_array_reserve_cell(
ulint* index) /* out: index of the reserved cell */
{
sync_cell_t* cell;
+ os_event_t event;
ulint i;
ut_a(object);
@@ -370,8 +368,8 @@ sync_array_reserve_cell(
/* Make sure the event is reset and also store
the value of signal_count at which the event
was reset. */
- cell->signal_count = sync_cell_event_reset(type,
- object);
+ event = sync_cell_get_event(cell);
+ cell->signal_count = os_event_reset(event);
cell->reservation_time = time(NULL);
@@ -411,19 +409,7 @@ sync_array_wait_event(
ut_a(!cell->waiting);
ut_ad(os_thread_get_curr_id() == cell->thread);
- if (cell->request_type == SYNC_MUTEX) {
- event = ((mutex_t*) cell->wait_object)->event;
-#ifdef __WIN__
- /* On windows if the thread about to wait is the one which
- has set the state of the rw_lock to RW_LOCK_WAIT_EX, then
- it waits on a special event i.e.: wait_ex_event. */
- } else if (cell->request_type == RW_LOCK_WAIT_EX) {
- event = ((rw_lock_t*) cell->wait_object)->wait_ex_event;
-#endif
- } else {
- event = ((rw_lock_t*) cell->wait_object)->event;
- }
-
+ event = sync_cell_get_event(cell);
cell->waiting = TRUE;
#ifdef UNIV_SYNC_DEBUG
@@ -462,6 +448,7 @@ sync_array_cell_print(
mutex_t* mutex;
rw_lock_t* rwlock;
ulint type;
+ ulint writer;
type = cell->request_type;
@@ -491,9 +478,7 @@ sync_array_cell_print(
(ulong) mutex->waiters);
} else if (type == RW_LOCK_EX
-#ifdef __WIN__
|| type == RW_LOCK_WAIT_EX
-#endif
|| type == RW_LOCK_SHARED) {
fputs(type == RW_LOCK_EX ? "X-lock on" : "S-lock on", file);
@@ -504,22 +489,25 @@ sync_array_cell_print(
" RW-latch at %p created in file %s line %lu\n",
(void*) rwlock, rwlock->cfile_name,
(ulong) rwlock->cline);
- if (rwlock->writer != RW_LOCK_NOT_LOCKED) {
+ writer = rw_lock_get_writer(rwlock);
+ if (writer != RW_LOCK_NOT_LOCKED) {
fprintf(file,
"a writer (thread id %lu) has"
" reserved it in mode %s",
(ulong) os_thread_pf(rwlock->writer_thread),
- rwlock->writer == RW_LOCK_EX
+ writer == RW_LOCK_EX
? " exclusive\n"
: " wait exclusive\n");
}
fprintf(file,
- "number of readers %lu, waiters flag %lu\n"
+ "number of readers %lu, waiters flag %lu, "
+ "lock_word: %ld\n"
"Last time read locked in file %s line %lu\n"
"Last time write locked in file %s line %lu\n",
- (ulong) rwlock->reader_count,
+ (ulong) rw_lock_get_reader_count(rwlock),
(ulong) rwlock->waiters,
+ rwlock->lock_word,
rwlock->last_s_file_name,
(ulong) rwlock->last_s_line,
rwlock->last_x_file_name,
@@ -778,28 +766,30 @@ sync_arr_cell_can_wake_up(
return(TRUE);
}
- } else if (cell->request_type == RW_LOCK_EX
- || cell->request_type == RW_LOCK_WAIT_EX) {
+ } else if (cell->request_type == RW_LOCK_EX) {
lock = cell->wait_object;
- if (rw_lock_get_reader_count(lock) == 0
- && rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
+ /* X_LOCK_DECR is the unlocked state */
+ if (lock->lock_word == X_LOCK_DECR) {
return(TRUE);
}
- if (rw_lock_get_reader_count(lock) == 0
- && rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX
- && os_thread_eq(lock->writer_thread, cell->thread)) {
+ } else if (cell->request_type == RW_LOCK_WAIT_EX) {
+
+ lock = cell->wait_object;
+
+ /* lock_word == 0 means all readers have left */
+ if (lock->lock_word == 0) {
return(TRUE);
}
-
} else if (cell->request_type == RW_LOCK_SHARED) {
lock = cell->wait_object;
- if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
+ /* lock_word > 0 means no writer or reserved writer */
+ if (lock->lock_word > 0) {
return(TRUE);
}
@@ -844,11 +834,15 @@ sync_array_object_signalled(
/*========================*/
sync_array_t* arr) /* in: wait array */
{
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+ __sync_fetch_and_add(&(arr->sg_count),1);
+#else
sync_array_enter(arr);
arr->sg_count++;
sync_array_exit(arr);
+#endif
}
/**************************************************************************
@@ -868,6 +862,7 @@ sync_arr_wake_threads_if_sema_free(void)
sync_cell_t* cell;
ulint count;
ulint i;
+ os_event_t event;
sync_array_enter(arr);
@@ -877,36 +872,20 @@ sync_arr_wake_threads_if_sema_free(void)
while (count < arr->n_reserved) {
cell = sync_array_get_nth_cell(arr, i);
+ i++;
- if (cell->wait_object != NULL) {
-
+ if (cell->wait_object == NULL) {
+ continue;
+ }
count++;
if (sync_arr_cell_can_wake_up(cell)) {
- if (cell->request_type == SYNC_MUTEX) {
- mutex_t* mutex;
+ event = sync_cell_get_event(cell);
- mutex = cell->wait_object;
- os_event_set(mutex->event);
-#ifdef __WIN__
- } else if (cell->request_type
- == RW_LOCK_WAIT_EX) {
- rw_lock_t* lock;
-
- lock = cell->wait_object;
- os_event_set(lock->wait_ex_event);
-#endif
- } else {
- rw_lock_t* lock;
-
- lock = cell->wait_object;
- os_event_set(lock->event);
- }
- }
+ os_event_set(event);
}
- i++;
}
sync_array_exit(arr);
@@ -1026,4 +1005,3 @@ sync_array_print_info(
sync_array_exit(arr);
}
-
diff --git a/storage/innobase/sync/sync0rw.c b/storage/innobase/sync/sync0rw.c
index 367f019ce55..6c9cb076cbe 100644
--- a/storage/innobase/sync/sync0rw.c
+++ b/storage/innobase/sync/sync0rw.c
@@ -15,35 +15,111 @@ Created 9/11/1995 Heikki Tuuri
#include "mem0mem.h"
#include "srv0srv.h"
-/* number of system calls made during shared latching */
-ulint rw_s_system_call_count = 0;
+/*
+ IMPLEMENTATION OF THE RW_LOCK
+ =============================
+The status of a rw_lock is held in lock_word. The initial value of lock_word is
+X_LOCK_DECR. lock_word is decremented by 1 for each s-lock and by X_LOCK_DECR
+for each x-lock. This describes the lock state for each value of lock_word:
+
+lock_word == X_LOCK_DECR: Unlocked.
+0 < lock_word < X_LOCK_DECR: Read locked, no waiting writers.
+ (X_LOCK_DECR - lock_word) is the
+ number of readers that hold the lock.
+lock_word == 0: Write locked
+-X_LOCK_DECR < lock_word < 0: Read locked, with a waiting writer.
+ (-lock_word) is the number of readers
+ that hold the lock.
+lock_word <= -X_LOCK_DECR: Recursively write locked. lock_word has been
+ decremented by X_LOCK_DECR once for each lock,
+ so the number of locks is:
+ ((-lock_word) / X_LOCK_DECR) + 1
+When lock_word <= -X_LOCK_DECR, we also know that lock_word % X_LOCK_DECR == 0:
+other values of lock_word are invalid.
+
+The lock_word is always read and updated atomically and consistently, so that
+it always represents the state of the lock, and the state of the lock changes
+with a single atomic operation. This lock_word holds all of the information
+that a thread needs in order to determine if it is eligible to gain the lock
+or if it must spin or sleep. The one exception to this is that writer_thread
+must be verified before recursive write locks: to solve this scenario, we make
+writer_thread readable by all threads, but only writeable by the x-lock holder.
+
+The other members of the lock obey the following rules to remain consistent:
+
+pass: This is only set to 1 to prevent recursive x-locks. It must
+ be set as specified by x_lock caller after the lock_word
+ indicates that the thread holds the lock, but before that
+ thread resumes execution. It must be reset to 0 during the
+ final x_unlock, but before the lock_word status is updated.
+ When an x_lock or move_ownership call wishes to change
+ pass, it must first update the writer_thread appropriately.
+writer_thread: Must be set to the writers thread_id after the lock_word
+ indicates that the thread holds the lock, but before that
+ thread resumes execution. It must be reset to -1 during the
+ final x_unlock, but before the lock_word status is updated.
+ This ensures that when the lock_word indicates that an x_lock
+ is held, the only legitimate values for writer_thread are -1
+ (x_lock function hasn't completed) or the writer's thread_id.
+waiters: May be set to 1 anytime, but to avoid unnecessary wake-up
+ signals, it should only be set to 1 when there are threads
+ waiting on event. Must be 1 when a writer starts waiting to
+ ensure the current x-locking thread sends a wake-up signal
+ during unlock. May only be reset to 0 immediately before a
+ a wake-up signal is sent to event.
+event: Threads wait on event for read or writer lock when another
+ thread has an x-lock or an x-lock reservation (wait_ex). A
+ thread may only wait on event after performing the following
+ actions in order:
+ (1) Record the counter value of event (with os_event_reset).
+ (2) Set waiters to 1.
+ (3) Verify lock_word <= 0.
+ (1) must come before (2) to ensure signal is not missed.
+ (2) must come before (3) to ensure a signal is sent.
+ These restrictions force the above ordering.
+ Immediately before sending the wake-up signal, we should:
+ (1) Verify lock_word == X_LOCK_DECR (unlocked)
+ (2) Reset waiters to 0.
+wait_ex_event: A thread may only wait on the wait_ex_event after it has
+ performed the following actions in order:
+ (1) Decrement lock_word by X_LOCK_DECR.
+ (2) Record counter value of wait_ex_event (os_event_reset,
+ called from sync_array_reserve_cell).
+ (3) Verify that lock_word < 0.
+ (1) must come first to ensures no other threads become reader
+ or next writer, and notifies unlocker that signal must be sent.
+ (2) must come before (3) to ensure the signal is not missed.
+ These restrictions force the above ordering.
+ Immediately before sending the wake-up signal, we should:
+ Verify lock_word == 0 (waiting thread holds x_lock)
+*/
+
/* number of spin waits on rw-latches,
resulted during shared (read) locks */
-ulint rw_s_spin_wait_count = 0;
+ib_longlong rw_s_spin_wait_count = 0;
+ib_longlong rw_s_spin_round_count = 0;
/* number of OS waits on rw-latches,
resulted during shared (read) locks */
-ulint rw_s_os_wait_count = 0;
+ib_longlong rw_s_os_wait_count = 0;
/* number of unlocks (that unlock shared locks),
set only when UNIV_SYNC_PERF_STAT is defined */
-ulint rw_s_exit_count = 0;
-
-/* number of system calls made during exclusive latching */
-ulint rw_x_system_call_count = 0;
+ib_longlong rw_s_exit_count = 0;
/* number of spin waits on rw-latches,
resulted during exclusive (write) locks */
-ulint rw_x_spin_wait_count = 0;
+ib_longlong rw_x_spin_wait_count = 0;
+ib_longlong rw_x_spin_round_count = 0;
/* number of OS waits on rw-latches,
resulted during exclusive (write) locks */
-ulint rw_x_os_wait_count = 0;
+ib_longlong rw_x_os_wait_count = 0;
/* number of unlocks (that unlock exclusive locks),
set only when UNIV_SYNC_PERF_STAT is defined */
-ulint rw_x_exit_count = 0;
+ib_longlong rw_x_exit_count = 0;
/* The global list of rw-locks */
rw_lock_list_t rw_lock_list;
@@ -119,6 +195,7 @@ rw_lock_create_func(
/* If this is the very first time a synchronization object is
created, then the following call initializes the sync system. */
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_create(rw_lock_get_mutex(lock), SYNC_NO_ORDER_CHECK);
lock->mutex.cfile_name = cfile_name;
@@ -129,12 +206,12 @@ rw_lock_create_func(
lock->mutex.mutex_type = 1;
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
- rw_lock_set_waiters(lock, 0);
- rw_lock_set_writer(lock, RW_LOCK_NOT_LOCKED);
- lock->writer_count = 0;
- rw_lock_set_reader_count(lock, 0);
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
- lock->writer_is_wait_ex = FALSE;
+ lock->lock_word = X_LOCK_DECR;
+ rw_lock_set_waiters(lock, 0);
+ lock->writer_thread = -1;
+ lock->pass = 0;
#ifdef UNIV_SYNC_DEBUG
UT_LIST_INIT(lock->debug_list);
@@ -147,15 +224,13 @@ rw_lock_create_func(
lock->cfile_name = cfile_name;
lock->cline = (unsigned int) cline;
+ lock->count_os_wait = 0;
lock->last_s_file_name = "not yet reserved";
lock->last_x_file_name = "not yet reserved";
lock->last_s_line = 0;
lock->last_x_line = 0;
lock->event = os_event_create(NULL);
-
-#ifdef __WIN__
lock->wait_ex_event = os_event_create(NULL);
-#endif
mutex_enter(&rw_lock_list_mutex);
@@ -180,20 +255,19 @@ rw_lock_free(
rw_lock_t* lock) /* in: rw-lock */
{
ut_ad(rw_lock_validate(lock));
- ut_a(rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED);
+ ut_a(lock->lock_word == X_LOCK_DECR);
ut_a(rw_lock_get_waiters(lock) == 0);
- ut_a(rw_lock_get_reader_count(lock) == 0);
lock->magic_n = 0;
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_free(rw_lock_get_mutex(lock));
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
mutex_enter(&rw_lock_list_mutex);
os_event_free(lock->event);
-#ifdef __WIN__
os_event_free(lock->wait_ex_event);
-#endif
if (UT_LIST_GET_PREV(list, lock)) {
ut_a(UT_LIST_GET_PREV(list, lock)->magic_n == RW_LOCK_MAGIC_N);
@@ -219,19 +293,12 @@ rw_lock_validate(
{
ut_a(lock);
- mutex_enter(rw_lock_get_mutex(lock));
+ ulint waiters = rw_lock_get_waiters(lock);
+ lint lock_word = lock->lock_word;
ut_a(lock->magic_n == RW_LOCK_MAGIC_N);
- ut_a((rw_lock_get_reader_count(lock) == 0)
- || (rw_lock_get_writer(lock) != RW_LOCK_EX));
- ut_a((rw_lock_get_writer(lock) == RW_LOCK_EX)
- || (rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX)
- || (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED));
- ut_a((rw_lock_get_waiters(lock) == 0)
- || (rw_lock_get_waiters(lock) == 1));
- ut_a((lock->writer != RW_LOCK_EX) || (lock->writer_count > 0));
-
- mutex_exit(rw_lock_get_mutex(lock));
+ ut_a(waiters == 0 || waiters == 1);
+ ut_a(lock_word > -X_LOCK_DECR ||(-lock_word) % X_LOCK_DECR == 0);
return(TRUE);
}
@@ -253,18 +320,15 @@ rw_lock_s_lock_spin(
ulint line) /* in: line where requested */
{
ulint index; /* index of the reserved wait cell */
- ulint i; /* spin round count */
+ ulint i = 0; /* spin round count */
ut_ad(rw_lock_validate(lock));
+ rw_s_spin_wait_count++; /* Count calls to this function */
lock_loop:
- rw_s_spin_wait_count++;
/* Spin waiting for the writer field to become free */
- i = 0;
-
- while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED
- && i < SYNC_SPIN_ROUNDS) {
+ while (i < SYNC_SPIN_ROUNDS && lock->lock_word <= 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
@@ -285,28 +349,32 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline, (ulong) i);
}
- mutex_enter(rw_lock_get_mutex(lock));
-
/* We try once again to obtain the lock */
-
if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
- mutex_exit(rw_lock_get_mutex(lock));
+ rw_s_spin_round_count += i;
return; /* Success */
} else {
- /* If we get here, locking did not succeed, we may
- suspend the thread to wait in the wait array */
- rw_s_system_call_count++;
+ if (i < SYNC_SPIN_ROUNDS) {
+ goto lock_loop;
+ }
+
+ rw_s_spin_round_count += i;
sync_array_reserve_cell(sync_primary_wait_array,
lock, RW_LOCK_SHARED,
file_name, line,
&index);
+ /* Set waiters before checking lock_word to ensure wake-up
+ signal is sent. This may lead to some unnecessary signals. */
rw_lock_set_waiters(lock, 1);
- mutex_exit(rw_lock_get_mutex(lock));
+ if (TRUE == rw_lock_s_lock_low(lock, pass, file_name, line)) {
+ sync_array_free_cell(sync_primary_wait_array, index);
+ return; /* Success */
+ }
if (srv_print_latch_waits) {
fprintf(stderr,
@@ -317,11 +385,13 @@ lock_loop:
(ulong) lock->cline);
}
- rw_s_system_call_count++;
+ /* these stats may not be accurate */
+ lock->count_os_wait++;
rw_s_os_wait_count++;
sync_array_wait_event(sync_primary_wait_array, index);
+ i = 0;
goto lock_loop;
}
}
@@ -343,113 +413,149 @@ rw_lock_x_lock_move_ownership(
{
ut_ad(rw_lock_is_locked(lock, RW_LOCK_EX));
+#ifdef HAVE_GCC_ATOMIC_BUILTINS
+ os_thread_id_t local_writer_thread = lock->writer_thread;
+ os_thread_id_t new_writer_thread = os_thread_get_curr_id();
+ while (TRUE) {
+ if (local_writer_thread != -1) {
+ if(os_compare_and_swap(
+ &(lock->writer_thread),
+ local_writer_thread,
+ new_writer_thread)) {
+ break;
+ }
+ }
+ local_writer_thread = lock->writer_thread;
+ }
+ lock->pass = 0;
+#else /* HAVE_GCC_ATOMIC_BUILTINS */
mutex_enter(&(lock->mutex));
-
lock->writer_thread = os_thread_get_curr_id();
-
lock->pass = 0;
-
mutex_exit(&(lock->mutex));
+#endif /* HAVE_GCC_ATOMIC_BUILTINS */
}
/**********************************************************************
-Low-level function for acquiring an exclusive lock. */
+Function for the next writer to call. Waits for readers to exit.
+The caller must have already decremented lock_word by X_LOCK_DECR.*/
UNIV_INLINE
-ulint
-rw_lock_x_lock_low(
-/*===============*/
- /* out: RW_LOCK_NOT_LOCKED if did
- not succeed, RW_LOCK_EX if success,
- RW_LOCK_WAIT_EX, if got wait reservation */
+void
+rw_lock_x_lock_wait(
+/*================*/
rw_lock_t* lock, /* in: pointer to rw-lock */
+#ifdef UNIV_SYNC_DEBUG
ulint pass, /* in: pass value; != 0, if the lock will
be passed to another thread to unlock */
+#endif
const char* file_name,/* in: file name where lock requested */
ulint line) /* in: line where requested */
{
- ut_ad(mutex_own(rw_lock_get_mutex(lock)));
-
- if (rw_lock_get_writer(lock) == RW_LOCK_NOT_LOCKED) {
+ ulint index;
+ ulint i = 0;
- if (rw_lock_get_reader_count(lock) == 0) {
+ ut_ad(lock->lock_word <= 0);
- rw_lock_set_writer(lock, RW_LOCK_EX);
- lock->writer_thread = os_thread_get_curr_id();
- lock->writer_count++;
- lock->pass = pass;
+ while (lock->lock_word < 0) {
+ if (srv_spin_wait_delay) {
+ ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
+ }
+ if(i < SYNC_SPIN_ROUNDS) {
+ i++;
+ continue;
+ }
-#ifdef UNIV_SYNC_DEBUG
- rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
- file_name, line);
-#endif
- lock->last_x_file_name = file_name;
- lock->last_x_line = (unsigned int) line;
+ /* If there is still a reader, then go to sleep.*/
+ rw_x_spin_round_count += i;
+ i = 0;
+ sync_array_reserve_cell(sync_primary_wait_array,
+ lock,
+ RW_LOCK_WAIT_EX,
+ file_name, line,
+ &index);
+ /* Check lock_word to ensure wake-up isn't missed.*/
+ if(lock->lock_word < 0) {
- /* Locking succeeded, we may return */
- return(RW_LOCK_EX);
- } else {
- /* There are readers, we have to wait */
- rw_lock_set_writer(lock, RW_LOCK_WAIT_EX);
- lock->writer_thread = os_thread_get_curr_id();
- lock->pass = pass;
- lock->writer_is_wait_ex = TRUE;
+ /* these stats may not be accurate */
+ lock->count_os_wait++;
+ rw_x_os_wait_count++;
+ /* Add debug info as it is needed to detect possible
+ deadlock. We must add info for WAIT_EX thread for
+ deadlock detection to work properly. */
#ifdef UNIV_SYNC_DEBUG
rw_lock_add_debug_info(lock, pass, RW_LOCK_WAIT_EX,
file_name, line);
#endif
- return(RW_LOCK_WAIT_EX);
- }
-
- } else if ((rw_lock_get_writer(lock) == RW_LOCK_WAIT_EX)
- && os_thread_eq(lock->writer_thread,
- os_thread_get_curr_id())) {
-
- if (rw_lock_get_reader_count(lock) == 0) {
-
- rw_lock_set_writer(lock, RW_LOCK_EX);
- lock->writer_count++;
- lock->pass = pass;
- lock->writer_is_wait_ex = FALSE;
-
+ sync_array_wait_event(sync_primary_wait_array,
+ index);
#ifdef UNIV_SYNC_DEBUG
- rw_lock_remove_debug_info(lock, pass, RW_LOCK_WAIT_EX);
- rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
- file_name, line);
+ rw_lock_remove_debug_info(lock, pass,
+ RW_LOCK_WAIT_EX);
#endif
-
- lock->last_x_file_name = file_name;
- lock->last_x_line = (unsigned int) line;
-
- /* Locking succeeded, we may return */
- return(RW_LOCK_EX);
+ /* It is possible to wake when lock_word < 0.
+ We must pass the while-loop check to proceed.*/
+ } else {
+ sync_array_free_cell(sync_primary_wait_array,
+ index);
}
+ }
+ rw_x_spin_round_count += i;
+}
- return(RW_LOCK_WAIT_EX);
-
- } else if ((rw_lock_get_writer(lock) == RW_LOCK_EX)
- && os_thread_eq(lock->writer_thread,
- os_thread_get_curr_id())
- && (lock->pass == 0)
- && (pass == 0)) {
+/**********************************************************************
+Low-level function for acquiring an exclusive lock. */
+UNIV_INLINE
+ibool
+rw_lock_x_lock_low(
+/*===============*/
+ /* out: RW_LOCK_NOT_LOCKED if did
+ not succeed, RW_LOCK_EX if success. */
+ rw_lock_t* lock, /* in: pointer to rw-lock */
+ ulint pass, /* in: pass value; != 0, if the lock will
+ be passed to another thread to unlock */
+ const char* file_name,/* in: file name where lock requested */
+ ulint line) /* in: line where requested */
+{
+ os_thread_id_t curr_thread = os_thread_get_curr_id();
+ ut_ad(curr_thread != -1); /* We use -1 as the unlocked value. */
- lock->writer_count++;
+ if(rw_lock_lock_word_decr(lock, X_LOCK_DECR)) {
+ ut_ad(lock->writer_thread == -1);
+ /* Decrement occurred: we are writer or next-writer. */
+ lock->writer_thread = curr_thread;
+ lock->pass = pass;
+ rw_lock_x_lock_wait(lock,
#ifdef UNIV_SYNC_DEBUG
- rw_lock_add_debug_info(lock, pass, RW_LOCK_EX, file_name,
- line);
+ pass,
#endif
+ file_name, line);
- lock->last_x_file_name = file_name;
- lock->last_x_line = (unsigned int) line;
-
- /* Locking succeeded, we may return */
- return(RW_LOCK_EX);
+ } else {
+ /* Decrement failed: relock or failed lock */
+ /* Must verify pass first: otherwise another thread can
+ call move_ownership suddenly allowing recursive locks.
+ and after we have verified our thread_id matches
+ (though move_ownership has since changed it).*/
+ if(!pass && !(lock->pass) &&
+ os_thread_eq(lock->writer_thread, curr_thread)) {
+ /* Relock */
+ lock->lock_word -= X_LOCK_DECR;
+ } else {
+ /* Another thread locked before us */
+ return(FALSE);
+ }
}
+#ifdef UNIV_SYNC_DEBUG
+ rw_lock_add_debug_info(lock, pass, RW_LOCK_EX,
+ file_name, line);
+#endif
+ lock->last_x_file_name = file_name;
+ lock->last_x_line = (unsigned int) line;
- /* Locking did not succeed */
- return(RW_LOCK_NOT_LOCKED);
+ return(TRUE);
}
/**********************************************************************
@@ -472,47 +578,30 @@ rw_lock_x_lock_func(
ulint line) /* in: line where requested */
{
ulint index; /* index of the reserved wait cell */
- ulint state; /* lock state acquired */
ulint i; /* spin round count */
+ ibool spinning = FALSE;
ut_ad(rw_lock_validate(lock));
-lock_loop:
- /* Acquire the mutex protecting the rw-lock fields */
- mutex_enter_fast(&(lock->mutex));
-
- state = rw_lock_x_lock_low(lock, pass, file_name, line);
+ i = 0;
- mutex_exit(&(lock->mutex));
+lock_loop:
- if (state == RW_LOCK_EX) {
+ if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
+ rw_x_spin_round_count += i;
return; /* Locking succeeded */
- } else if (state == RW_LOCK_NOT_LOCKED) {
-
- /* Spin waiting for the writer field to become free */
- i = 0;
-
- while (rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED
- && i < SYNC_SPIN_ROUNDS) {
- if (srv_spin_wait_delay) {
- ut_delay(ut_rnd_interval(0,
- srv_spin_wait_delay));
- }
+ } else {
- i++;
- }
- if (i == SYNC_SPIN_ROUNDS) {
- os_thread_yield();
+ if (!spinning) {
+ spinning = TRUE;
+ rw_x_spin_wait_count++;
}
- } else if (state == RW_LOCK_WAIT_EX) {
- /* Spin waiting for the reader count field to become zero */
- i = 0;
-
- while (rw_lock_get_reader_count(lock) != 0
- && i < SYNC_SPIN_ROUNDS) {
+ /* Spin waiting for the lock_word to become free */
+ while (i < SYNC_SPIN_ROUNDS
+ && lock->lock_word <= 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0,
srv_spin_wait_delay));
@@ -522,12 +611,13 @@ lock_loop:
}
if (i == SYNC_SPIN_ROUNDS) {
os_thread_yield();
+ } else {
+ goto lock_loop;
}
- } else {
- i = 0; /* Eliminate a compiler warning */
- ut_error;
}
+ rw_x_spin_round_count += i;
+
if (srv_print_latch_waits) {
fprintf(stderr,
"Thread %lu spin wait rw-x-lock at %p"
@@ -536,39 +626,20 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline, (ulong) i);
}
- rw_x_spin_wait_count++;
-
- /* We try once again to obtain the lock. Acquire the mutex protecting
- the rw-lock fields */
-
- mutex_enter(rw_lock_get_mutex(lock));
-
- state = rw_lock_x_lock_low(lock, pass, file_name, line);
-
- if (state == RW_LOCK_EX) {
- mutex_exit(rw_lock_get_mutex(lock));
-
- return; /* Locking succeeded */
- }
-
- rw_x_system_call_count++;
-
sync_array_reserve_cell(sync_primary_wait_array,
lock,
-#ifdef __WIN__
- /* On windows RW_LOCK_WAIT_EX signifies
- that this thread should wait on the
- special wait_ex_event. */
- (state == RW_LOCK_WAIT_EX)
- ? RW_LOCK_WAIT_EX :
-#endif
RW_LOCK_EX,
file_name, line,
&index);
+ /* Waiters must be set before checking lock_word, to ensure signal
+ is sent. This could lead to a few unnecessary wake-up signals. */
rw_lock_set_waiters(lock, 1);
- mutex_exit(rw_lock_get_mutex(lock));
+ if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
+ sync_array_free_cell(sync_primary_wait_array, index);
+ return; /* Locking succeeded */
+ }
if (srv_print_latch_waits) {
fprintf(stderr,
@@ -578,11 +649,13 @@ lock_loop:
lock->cfile_name, (ulong) lock->cline);
}
- rw_x_system_call_count++;
+ /* these stats may not be accurate */
+ lock->count_os_wait++;
rw_x_os_wait_count++;
sync_array_wait_event(sync_primary_wait_array, index);
+ i = 0;
goto lock_loop;
}
@@ -730,7 +803,7 @@ rw_lock_own(
ut_ad(lock);
ut_ad(rw_lock_validate(lock));
- mutex_enter(&(lock->mutex));
+ rw_lock_debug_mutex_enter();
info = UT_LIST_GET_FIRST(lock->debug_list);
@@ -740,7 +813,7 @@ rw_lock_own(
&& (info->pass == 0)
&& (info->lock_type == lock_type)) {
- mutex_exit(&(lock->mutex));
+ rw_lock_debug_mutex_exit();
/* Found! */
return(TRUE);
@@ -748,7 +821,7 @@ rw_lock_own(
info = UT_LIST_GET_NEXT(list, info);
}
- mutex_exit(&(lock->mutex));
+ rw_lock_debug_mutex_exit();
return(FALSE);
}
@@ -770,22 +843,18 @@ rw_lock_is_locked(
ut_ad(lock);
ut_ad(rw_lock_validate(lock));
- mutex_enter(&(lock->mutex));
-
if (lock_type == RW_LOCK_SHARED) {
- if (lock->reader_count > 0) {
+ if (rw_lock_get_reader_count(lock) > 0) {
ret = TRUE;
}
} else if (lock_type == RW_LOCK_EX) {
- if (lock->writer == RW_LOCK_EX) {
+ if (rw_lock_get_writer(lock) == RW_LOCK_EX) {
ret = TRUE;
}
} else {
ut_error;
}
- mutex_exit(&(lock->mutex));
-
return(ret);
}
@@ -814,11 +883,10 @@ rw_lock_list_print_info(
count++;
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_enter(&(lock->mutex));
-
- if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
- || (rw_lock_get_reader_count(lock) != 0)
- || (rw_lock_get_waiters(lock) != 0)) {
+#endif
+ if (lock->lock_word != X_LOCK_DECR) {
fprintf(file, "RW-LOCK: %p ", (void*) lock);
@@ -834,8 +902,10 @@ rw_lock_list_print_info(
info = UT_LIST_GET_NEXT(list, info);
}
}
-
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
mutex_exit(&(lock->mutex));
+#endif
+
lock = UT_LIST_GET_NEXT(list, lock);
}
@@ -858,9 +928,10 @@ rw_lock_print(
"RW-LATCH INFO\n"
"RW-LATCH: %p ", (void*) lock);
- if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
- || (rw_lock_get_reader_count(lock) != 0)
- || (rw_lock_get_waiters(lock) != 0)) {
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
+ mutex_enter(&(lock->mutex));
+#endif
+ if (lock->lock_word != X_LOCK_DECR) {
if (rw_lock_get_waiters(lock)) {
fputs(" Waiters for the lock exist\n", stderr);
@@ -874,6 +945,9 @@ rw_lock_print(
info = UT_LIST_GET_NEXT(list, info);
}
}
+#ifndef HAVE_GCC_ATOMIC_BUILTINS
+ mutex_exit(&(lock->mutex));
+#endif
}
/*************************************************************************
@@ -922,14 +996,11 @@ rw_lock_n_locked(void)
lock = UT_LIST_GET_FIRST(rw_lock_list);
while (lock != NULL) {
- mutex_enter(rw_lock_get_mutex(lock));
- if ((rw_lock_get_writer(lock) != RW_LOCK_NOT_LOCKED)
- || (rw_lock_get_reader_count(lock) != 0)) {
+ if (lock->lock_word != X_LOCK_DECR) {
count++;
}
- mutex_exit(rw_lock_get_mutex(lock));
lock = UT_LIST_GET_NEXT(list, lock);
}
diff --git a/storage/innobase/sync/sync0sync.c b/storage/innobase/sync/sync0sync.c
index 944fd2a97fc..4176143d679 100644
--- a/storage/innobase/sync/sync0sync.c
+++ b/storage/innobase/sync/sync0sync.c
@@ -138,18 +138,13 @@ Therefore, this thread is guaranteed to catch the os_set_event()
signalled unconditionally at the release of the lock.
Q.E.D. */
-/* The number of system calls made in this module. Intended for performance
-monitoring. */
-
-ulint mutex_system_call_count = 0;
-
/* Number of spin waits on mutexes: for performance monitoring */
/* round=one iteration of a spin loop */
-ulint mutex_spin_round_count = 0;
-ulint mutex_spin_wait_count = 0;
-ulint mutex_os_wait_count = 0;
-ulint mutex_exit_count = 0;
+ib_longlong mutex_spin_round_count = 0;
+ib_longlong mutex_spin_wait_count = 0;
+ib_longlong mutex_os_wait_count = 0;
+ib_longlong mutex_exit_count = 0;
/* The global array of wait cells for implementation of the database's own
mutexes and read-write locks */
@@ -243,6 +238,8 @@ mutex_create_func(
{
#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
mutex_reset_lock_word(mutex);
+#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
+ mutex_reset_lock_word(mutex);
#else
os_fast_mutex_init(&(mutex->os_fast_mutex));
mutex->lock_word = 0;
@@ -333,7 +330,9 @@ mutex_free(
os_event_free(mutex->event);
-#if !defined(_WIN32) || !defined(UNIV_CAN_USE_X86_ASSEMBLER)
+#if defined(_WIN32) && defined(UNIV_CAN_USE_X86_ASSEMBLER)
+#elif defined(HAVE_GCC_ATOMIC_BUILTINS)
+#else
os_fast_mutex_free(&(mutex->os_fast_mutex));
#endif
/* If we free the mutex protecting the mutex list (freeing is
@@ -450,6 +449,12 @@ mutex_spin_wait(
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
ut_ad(mutex);
+ /* This update is not thread safe, but we don't mind if the count
+ isn't exact. Moved out of ifdef that follows because we are willing
+ to sacrifice the cost of counting this as the data is valuable.
+ Count the number of calls to mutex_spin_wait. */
+ mutex_spin_wait_count++;
+
mutex_loop:
i = 0;
@@ -462,7 +467,6 @@ mutex_loop:
spin_loop:
#if defined UNIV_DEBUG && !defined UNIV_HOTBACKUP
- mutex_spin_wait_count++;
mutex->count_spin_loop++;
#endif /* UNIV_DEBUG && !UNIV_HOTBACKUP */
@@ -527,8 +531,6 @@ spin_loop:
sync_array_reserve_cell(sync_primary_wait_array, mutex,
SYNC_MUTEX, file_name, line, &index);
- mutex_system_call_count++;
-
/* The memory order of the array reservation and the change in the
waiters field is important: when we suspend a thread, we first
reserve the cell and then set waiters field to 1. When threads are
@@ -575,7 +577,6 @@ spin_loop:
mutex->cfile_name, (ulong) mutex->cline, (ulong) i);
#endif
- mutex_system_call_count++;
mutex_os_wait_count++;
#ifndef UNIV_HOTBACKUP
@@ -1377,21 +1378,31 @@ sync_print_wait_info(
FILE* file) /* in: file where to print */
{
#ifdef UNIV_SYNC_DEBUG
- fprintf(file, "Mutex exits %lu, rws exits %lu, rwx exits %lu\n",
+ fprintf(file, "Mutex exits %llu, rws exits %llu, rwx exits %llu\n",
mutex_exit_count, rw_s_exit_count, rw_x_exit_count);
#endif
fprintf(file,
- "Mutex spin waits %lu, rounds %lu, OS waits %lu\n"
- "RW-shared spins %lu, OS waits %lu;"
- " RW-excl spins %lu, OS waits %lu\n",
- (ulong) mutex_spin_wait_count,
- (ulong) mutex_spin_round_count,
- (ulong) mutex_os_wait_count,
- (ulong) rw_s_spin_wait_count,
- (ulong) rw_s_os_wait_count,
- (ulong) rw_x_spin_wait_count,
- (ulong) rw_x_os_wait_count);
+ "Mutex spin waits %llu, rounds %llu, OS waits %llu\n"
+ "RW-shared spins %llu, OS waits %llu;"
+ " RW-excl spins %llu, OS waits %llu\n",
+ mutex_spin_wait_count,
+ mutex_spin_round_count,
+ mutex_os_wait_count,
+ rw_s_spin_wait_count,
+ rw_s_os_wait_count,
+ rw_x_spin_wait_count,
+ rw_x_os_wait_count);
+
+ fprintf(file,
+ "Spin rounds per wait: %.2f mutex, %.2f RW-shared, "
+ "%.2f RW-excl\n",
+ (double) mutex_spin_round_count /
+ (mutex_spin_wait_count ? mutex_spin_wait_count : 1),
+ (double) rw_s_spin_round_count /
+ (rw_s_spin_wait_count ? rw_s_spin_wait_count : 1),
+ (double) rw_x_spin_round_count /
+ (rw_x_spin_wait_count ? rw_x_spin_wait_count : 1));
}
/***********************************************************************