summaryrefslogtreecommitdiff
path: root/storage/xtradb/lock/lock0lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'storage/xtradb/lock/lock0lock.c')
-rw-r--r--storage/xtradb/lock/lock0lock.c445
1 files changed, 287 insertions, 158 deletions
diff --git a/storage/xtradb/lock/lock0lock.c b/storage/xtradb/lock/lock0lock.c
index 59394f13766..7ec4a53e0ea 100644
--- a/storage/xtradb/lock/lock0lock.c
+++ b/storage/xtradb/lock/lock0lock.c
@@ -1,6 +1,6 @@
/*****************************************************************************
-Copyright (c) 1996, 2009, Innobase Oy. All Rights Reserved.
+Copyright (c) 1996, 2010, Innobase Oy. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -376,6 +376,7 @@ UNIV_INTERN FILE* lock_latest_err_file;
/* Flags for recursive deadlock search */
#define LOCK_VICTIM_IS_START 1
#define LOCK_VICTIM_IS_OTHER 2
+#define LOCK_EXCEED_MAX_DEPTH 3
/********************************************************************//**
Checks if a lock request results in a deadlock.
@@ -394,24 +395,25 @@ Looks recursively for a deadlock.
deadlock and we chose 'start' as the victim, LOCK_VICTIM_IS_OTHER if a
deadlock was found and we chose some other trx as a victim: we must do
the search again in this last case because there may be another
-deadlock! */
+deadlock!
+LOCK_EXCEED_MAX_DEPTH if the lock search exceeds max steps or max depth. */
static
ulint
lock_deadlock_recursive(
/*====================*/
trx_t* start, /*!< in: recursion starting point */
trx_t* trx, /*!< in: a transaction waiting for a lock */
- lock_t* wait_lock, /*!< in: the lock trx is waiting to be granted */
+ lock_t* wait_lock, /*!< in: lock that is waiting to be granted */
ulint* cost, /*!< in/out: number of calculation steps thus
far: if this exceeds LOCK_MAX_N_STEPS_...
- we return LOCK_VICTIM_IS_START */
+ we return LOCK_EXCEED_MAX_DEPTH */
ulint depth); /*!< in: recursion depth: if this exceeds
LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we
- return LOCK_VICTIM_IS_START */
+ return LOCK_EXCEED_MAX_DEPTH */
/*********************************************************************//**
Gets the nth bit of a record lock.
-@return TRUE if bit set */
+@return TRUE if bit set also if i == ULINT_UNDEFINED return FALSE*/
UNIV_INLINE
ibool
lock_rec_get_nth_bit(
@@ -1222,7 +1224,7 @@ lock_rec_get_first_on_page(
/*********************************************************************//**
Gets the next explicit lock request on a record.
-@return next lock, NULL if none exists */
+@return next lock, NULL if none exists or if heap_no == ULINT_UNDEFINED */
UNIV_INLINE
lock_t*
lock_rec_get_next(
@@ -1731,11 +1733,11 @@ lock_rec_create(
Enqueues a waiting request for a lock which cannot be granted immediately.
Checks for deadlocks.
@return DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED, or
-DB_SUCCESS; DB_SUCCESS means that there was a deadlock, but another
-transaction was chosen as a victim, and we got the lock immediately:
-no need to wait then */
+DB_SUCCESS_LOCKED_REC; DB_SUCCESS_LOCKED_REC means that
+there was a deadlock, but another transaction was chosen as a victim,
+and we got the lock immediately: no need to wait then */
static
-ulint
+enum db_err
lock_rec_enqueue_waiting(
/*=====================*/
ulint type_mode,/*!< in: lock mode this
@@ -1809,7 +1811,7 @@ lock_rec_enqueue_waiting(
if (trx->wait_lock == NULL) {
- return(DB_SUCCESS);
+ return(DB_SUCCESS_LOCKED_REC);
}
trx->que_state = TRX_QUE_LOCK_WAIT;
@@ -1929,6 +1931,16 @@ somebody_waits:
return(lock_rec_create(type_mode, block, heap_no, index, trx));
}
+/** Record locking request status */
+enum lock_rec_req_status {
+ /** Failed to acquire a lock */
+ LOCK_REC_FAIL,
+ /** Succeeded in acquiring a lock (implicit or already acquired) */
+ LOCK_REC_SUCCESS,
+ /** Explicitly created a new lock */
+ LOCK_REC_SUCCESS_CREATED
+};
+
/*********************************************************************//**
This is a fast routine for locking a record in the most common cases:
there are no explicit locks on the page, or there is just one lock, owned
@@ -1936,9 +1948,9 @@ by this transaction, and of the right type_mode. This is a low-level function
which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case of
a page supremum record, a gap type lock.
-@return TRUE if locking succeeded */
+@return whether the locking succeeded */
UNIV_INLINE
-ibool
+enum lock_rec_req_status
lock_rec_lock_fast(
/*===============*/
ibool impl, /*!< in: if TRUE, no lock is set
@@ -1977,19 +1989,19 @@ lock_rec_lock_fast(
lock_rec_create(mode, block, heap_no, index, trx);
}
- return(TRUE);
+ return(LOCK_REC_SUCCESS_CREATED);
}
if (lock_rec_get_next_on_page(lock)) {
- return(FALSE);
+ return(LOCK_REC_FAIL);
}
if (lock->trx != trx
|| lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) {
- return(FALSE);
+ return(LOCK_REC_FAIL);
}
if (!impl) {
@@ -1998,10 +2010,11 @@ lock_rec_lock_fast(
if (!lock_rec_get_nth_bit(lock, heap_no)) {
lock_rec_set_nth_bit(lock, heap_no);
+ return(LOCK_REC_SUCCESS_CREATED);
}
}
- return(TRUE);
+ return(LOCK_REC_SUCCESS);
}
/*********************************************************************//**
@@ -2009,9 +2022,10 @@ This is the general, and slower, routine for locking a record. This is a
low-level function which does NOT look at implicit locks! Checks lock
compatibility within explicit locks. This function sets a normal next-key
lock, or in the case of a page supremum record, a gap type lock.
-@return DB_SUCCESS, DB_LOCK_WAIT, or error code */
+@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
static
-ulint
+enum db_err
lock_rec_lock_slow(
/*===============*/
ibool impl, /*!< in: if TRUE, no lock is set
@@ -2028,7 +2042,6 @@ lock_rec_lock_slow(
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
- ulint err;
ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
@@ -2047,27 +2060,23 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do
nothing */
- err = DB_SUCCESS;
} else if (lock_rec_other_has_conflicting(mode, block, heap_no, trx)) {
/* If another transaction has a non-gap conflicting request in
the queue, as this transaction does not have a lock strong
enough already granted on the record, we have to wait. */
- err = lock_rec_enqueue_waiting(mode, block, heap_no,
- index, thr);
- } else {
- if (!impl) {
- /* Set the requested lock on the record */
-
- lock_rec_add_to_queue(LOCK_REC | mode, block,
- heap_no, index, trx);
- }
+ return(lock_rec_enqueue_waiting(mode, block, heap_no,
+ index, thr));
+ } else if (!impl) {
+ /* Set the requested lock on the record */
- err = DB_SUCCESS;
+ lock_rec_add_to_queue(LOCK_REC | mode, block,
+ heap_no, index, trx);
+ return(DB_SUCCESS_LOCKED_REC);
}
- return(err);
+ return(DB_SUCCESS);
}
/*********************************************************************//**
@@ -2076,9 +2085,10 @@ possible, enqueues a waiting lock request. This is a low-level function
which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case
of a page supremum record, a gap type lock.
-@return DB_SUCCESS, DB_LOCK_WAIT, or error code */
+@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
static
-ulint
+enum db_err
lock_rec_lock(
/*==========*/
ibool impl, /*!< in: if TRUE, no lock is set
@@ -2094,8 +2104,6 @@ lock_rec_lock(
dict_index_t* index, /*!< in: index of record */
que_thr_t* thr) /*!< in: query thread */
{
- ulint err;
-
ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
@@ -2107,18 +2115,20 @@ lock_rec_lock(
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0);
- if (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) {
-
- /* We try a simplified and faster subroutine for the most
- common cases */
-
- err = DB_SUCCESS;
- } else {
- err = lock_rec_lock_slow(impl, mode, block,
- heap_no, index, thr);
+ /* We try a simplified and faster subroutine for the most
+ common cases */
+ switch (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) {
+ case LOCK_REC_SUCCESS:
+ return(DB_SUCCESS);
+ case LOCK_REC_SUCCESS_CREATED:
+ return(DB_SUCCESS_LOCKED_REC);
+ case LOCK_REC_FAIL:
+ return(lock_rec_lock_slow(impl, mode, block,
+ heap_no, index, thr));
}
- return(err);
+ ut_error;
+ return(DB_ERROR);
}
/*********************************************************************//**
@@ -2404,7 +2414,7 @@ lock_rec_inherit_to_gap(
if (!lock_rec_get_insert_intention(lock)
&& !((srv_locks_unsafe_for_binlog
|| lock->trx->isolation_level
- == TRX_ISO_READ_COMMITTED)
+ <= TRX_ISO_READ_COMMITTED)
&& lock_get_mode(lock) == LOCK_X)) {
lock_rec_add_to_queue(LOCK_REC | LOCK_GAP
@@ -3267,8 +3277,6 @@ lock_deadlock_occurs(
lock_t* lock, /*!< in: lock the transaction is requesting */
trx_t* trx) /*!< in: transaction */
{
- dict_table_t* table;
- dict_index_t* index;
trx_t* mark_trx;
ulint ret;
ulint cost = 0;
@@ -3290,31 +3298,51 @@ retry:
ret = lock_deadlock_recursive(trx, trx, lock, &cost, 0);
- if (ret == LOCK_VICTIM_IS_OTHER) {
+ switch (ret) {
+ case LOCK_VICTIM_IS_OTHER:
/* We chose some other trx as a victim: retry if there still
is a deadlock */
-
goto retry;
- }
- if (UNIV_UNLIKELY(ret == LOCK_VICTIM_IS_START)) {
- if (lock_get_type_low(lock) & LOCK_TABLE) {
- table = lock->un_member.tab_lock.table;
- index = NULL;
+ case LOCK_EXCEED_MAX_DEPTH:
+ /* If the lock search exceeds the max step
+ or the max depth, the current trx will be
+ the victim. Print its information. */
+ rewind(lock_latest_err_file);
+ ut_print_timestamp(lock_latest_err_file);
+
+ fputs("TOO DEEP OR LONG SEARCH IN THE LOCK TABLE"
+ " WAITS-FOR GRAPH, WE WILL ROLL BACK"
+ " FOLLOWING TRANSACTION \n",
+ lock_latest_err_file);
+
+ fputs("\n*** TRANSACTION:\n", lock_latest_err_file);
+ trx_print(lock_latest_err_file, trx, 3000);
+
+ fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n",
+ lock_latest_err_file);
+
+ if (lock_get_type(lock) == LOCK_REC) {
+ lock_rec_print(lock_latest_err_file, lock);
} else {
- index = lock->index;
- table = index->table;
+ lock_table_print(lock_latest_err_file, lock);
}
+ break;
- lock_deadlock_found = TRUE;
-
+ case LOCK_VICTIM_IS_START:
+ srv_n_lock_deadlock_count++;
fputs("*** WE ROLL BACK TRANSACTION (2)\n",
lock_latest_err_file);
+ break;
- return(TRUE);
+ default:
+ /* No deadlock detected*/
+ return(FALSE);
}
- return(FALSE);
+ lock_deadlock_found = TRUE;
+
+ return(TRUE);
}
/********************************************************************//**
@@ -3323,25 +3351,26 @@ Looks recursively for a deadlock.
deadlock and we chose 'start' as the victim, LOCK_VICTIM_IS_OTHER if a
deadlock was found and we chose some other trx as a victim: we must do
the search again in this last case because there may be another
-deadlock! */
+deadlock!
+LOCK_EXCEED_MAX_DEPTH if the lock search exceeds max steps or max depth. */
static
ulint
lock_deadlock_recursive(
/*====================*/
trx_t* start, /*!< in: recursion starting point */
trx_t* trx, /*!< in: a transaction waiting for a lock */
- lock_t* wait_lock, /*!< in: the lock trx is waiting to be granted */
+ lock_t* wait_lock, /*!< in: lock that is waiting to be granted */
ulint* cost, /*!< in/out: number of calculation steps thus
far: if this exceeds LOCK_MAX_N_STEPS_...
- we return LOCK_VICTIM_IS_START */
+ we return LOCK_EXCEED_MAX_DEPTH */
ulint depth) /*!< in: recursion depth: if this exceeds
LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we
- return LOCK_VICTIM_IS_START */
+ return LOCK_EXCEED_MAX_DEPTH */
{
+ ulint ret;
lock_t* lock;
- ulint bit_no = ULINT_UNDEFINED;
trx_t* lock_trx;
- ulint ret;
+ ulint heap_no = ULINT_UNDEFINED;
ut_a(trx);
ut_a(start);
@@ -3357,27 +3386,44 @@ lock_deadlock_recursive(
*cost = *cost + 1;
- lock = wait_lock;
-
if (lock_get_type_low(wait_lock) == LOCK_REC) {
+ ulint space;
+ ulint page_no;
- bit_no = lock_rec_find_set_bit(wait_lock);
+ heap_no = lock_rec_find_set_bit(wait_lock);
+ ut_a(heap_no != ULINT_UNDEFINED);
+
+ space = wait_lock->un_member.rec_lock.space;
+ page_no = wait_lock->un_member.rec_lock.page_no;
+
+ lock = lock_rec_get_first_on_page_addr(space, page_no);
+
+ /* Position the iterator on the first matching record lock. */
+ while (lock != NULL
+ && lock != wait_lock
+ && !lock_rec_get_nth_bit(lock, heap_no)) {
+
+ lock = lock_rec_get_next_on_page(lock);
+ }
- ut_a(bit_no != ULINT_UNDEFINED);
+ if (lock == wait_lock) {
+ lock = NULL;
+ }
+
+ ut_ad(lock == NULL || lock_rec_get_nth_bit(lock, heap_no));
+
+ } else {
+ lock = wait_lock;
}
/* Look at the locks ahead of wait_lock in the lock queue */
for (;;) {
- if (lock_get_type_low(lock) & LOCK_TABLE) {
-
- lock = UT_LIST_GET_PREV(un_member.tab_lock.locks,
- lock);
- } else {
- ut_ad(lock_get_type_low(lock) == LOCK_REC);
- ut_a(bit_no != ULINT_UNDEFINED);
+ /* Get previous table lock. */
+ if (heap_no == ULINT_UNDEFINED) {
- lock = (lock_t*) lock_rec_get_prev(lock, bit_no);
+ lock = UT_LIST_GET_PREV(
+ un_member.tab_lock.locks, lock);
}
if (lock == NULL) {
@@ -3395,7 +3441,7 @@ lock_deadlock_recursive(
lock_trx = lock->trx;
- if (lock_trx == start || too_far) {
+ if (lock_trx == start) {
/* We came back to the recursion starting
point: a deadlock detected; or we have
@@ -3442,19 +3488,10 @@ lock_deadlock_recursive(
}
#ifdef UNIV_DEBUG
if (lock_print_waits) {
- fputs("Deadlock detected"
- " or too long search\n",
+ fputs("Deadlock detected\n",
stderr);
}
#endif /* UNIV_DEBUG */
- if (too_far) {
-
- fputs("TOO DEEP OR LONG SEARCH"
- " IN THE LOCK TABLE"
- " WAITS-FOR GRAPH\n", ef);
-
- return(LOCK_VICTIM_IS_START);
- }
if (trx_weight_cmp(wait_lock->trx,
start) >= 0) {
@@ -3490,6 +3527,21 @@ lock_deadlock_recursive(
return(LOCK_VICTIM_IS_OTHER);
}
+ if (too_far) {
+
+#ifdef UNIV_DEBUG
+ if (lock_print_waits) {
+ fputs("Deadlock search exceeds"
+ " max steps or depth.\n",
+ stderr);
+ }
+#endif /* UNIV_DEBUG */
+ /* The information about transaction/lock
+ to be rolled back is available in the top
+ level. Do not print anything here. */
+ return(LOCK_EXCEED_MAX_DEPTH);
+ }
+
if (lock_trx->que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested lock in an
@@ -3499,12 +3551,28 @@ lock_deadlock_recursive(
ret = lock_deadlock_recursive(
start, lock_trx,
lock_trx->wait_lock, cost, depth + 1);
+
if (ret != 0) {
return(ret);
}
}
}
+ /* Get the next record lock to check. */
+ if (heap_no != ULINT_UNDEFINED) {
+
+ ut_a(lock != NULL);
+
+ do {
+ lock = lock_rec_get_next_on_page(lock);
+ } while (lock != NULL
+ && lock != wait_lock
+ && !lock_rec_get_nth_bit(lock, heap_no));
+
+ if (lock == wait_lock) {
+ lock = NULL;
+ }
+ }
}/* end of the 'for (;;)'-loop */
}
@@ -3706,9 +3774,10 @@ lock_table_enqueue_waiting(
/*********************************************************************//**
Checks if other transactions have an incompatible mode lock request in
-the lock queue. */
+the lock queue.
+@return lock or NULL */
UNIV_INLINE
-ibool
+lock_t*
lock_table_other_has_incompatible(
/*==============================*/
trx_t* trx, /*!< in: transaction, or NULL if all
@@ -3730,13 +3799,13 @@ lock_table_other_has_incompatible(
&& (!lock_mode_compatible(lock_get_mode(lock), mode))
&& (wait || !(lock_get_wait(lock)))) {
- return(TRUE);
+ return(lock);
}
lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock);
}
- return(FALSE);
+ return(NULL);
}
/*********************************************************************//**
@@ -3887,8 +3956,8 @@ lock_rec_unlock(
const rec_t* rec, /*!< in: record */
enum lock_mode lock_mode)/*!< in: LOCK_S or LOCK_X */
{
+ lock_t* first_lock;
lock_t* lock;
- lock_t* release_lock = NULL;
ulint heap_no;
ut_ad(trx && rec);
@@ -3898,48 +3967,40 @@ lock_rec_unlock(
mutex_enter(&kernel_mutex);
- lock = lock_rec_get_first(block, heap_no);
+ first_lock = lock_rec_get_first(block, heap_no);
/* Find the last lock with the same lock_mode and transaction
from the record. */
- while (lock != NULL) {
+ for (lock = first_lock; lock != NULL;
+ lock = lock_rec_get_next(heap_no, lock)) {
if (lock->trx == trx && lock_get_mode(lock) == lock_mode) {
- release_lock = lock;
ut_a(!lock_get_wait(lock));
+ lock_rec_reset_nth_bit(lock, heap_no);
+ goto released;
}
-
- lock = lock_rec_get_next(heap_no, lock);
}
- /* If a record lock is found, release the record lock */
-
- if (UNIV_LIKELY(release_lock != NULL)) {
- lock_rec_reset_nth_bit(release_lock, heap_no);
- } else {
- mutex_exit(&kernel_mutex);
- ut_print_timestamp(stderr);
- fprintf(stderr,
- " InnoDB: Error: unlock row could not"
- " find a %lu mode lock on the record\n",
- (ulong) lock_mode);
+ mutex_exit(&kernel_mutex);
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: Error: unlock row could not"
+ " find a %lu mode lock on the record\n",
+ (ulong) lock_mode);
- return;
- }
+ return;
+released:
/* Check if we can now grant waiting lock requests */
- lock = lock_rec_get_first(block, heap_no);
-
- while (lock != NULL) {
+ for (lock = first_lock; lock != NULL;
+ lock = lock_rec_get_next(heap_no, lock)) {
if (lock_get_wait(lock)
&& !lock_rec_has_to_wait_in_queue(lock)) {
/* Grant the lock */
lock_grant(lock);
}
-
- lock = lock_rec_get_next(heap_no, lock);
}
mutex_exit(&kernel_mutex);
@@ -4260,31 +4321,34 @@ lock_rec_print(
putc('\n', file);
if ( srv_show_verbose_locks ) {
- block = buf_page_try_get(space, page_no, &mtr);
+ block = buf_page_try_get(space, page_no, &mtr);
+
+ for (i = 0; i < lock_rec_get_n_bits(lock); ++i) {
+
+ if (!lock_rec_get_nth_bit(lock, i)) {
+ continue;
+ }
+
+ fprintf(file, "Record lock, heap no %lu", (ulong) i);
+
if (block) {
- for (i = 0; i < lock_rec_get_n_bits(lock); i++) {
+ const rec_t* rec;
- if (lock_rec_get_nth_bit(lock, i)) {
+ rec = page_find_rec_with_heap_no(
+ buf_block_get_frame(block), i);
- const rec_t* rec
- = page_find_rec_with_heap_no(
- buf_block_get_frame(block), i);
- offsets = rec_get_offsets(
- rec, lock->index, offsets,
- ULINT_UNDEFINED, &heap);
+ offsets = rec_get_offsets(
+ rec, lock->index, offsets,
+ ULINT_UNDEFINED, &heap);
- fprintf(file, "Record lock, heap no %lu ",
- (ulong) i);
- rec_print_new(file, rec, offsets);
- putc('\n', file);
- }
- }
- } else {
- for (i = 0; i < lock_rec_get_n_bits(lock); i++) {
- fprintf(file, "Record lock, heap no %lu\n", (ulong) i);
- }
+ putc(' ', file);
+ rec_print_new(file, rec, offsets);
}
+
+ putc('\n', file);
}
+ }
+
mtr_commit(&mtr);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
@@ -4329,14 +4393,26 @@ lock_get_n_rec_locks(void)
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
/*********************************************************************//**
-Prints info of locks for all transactions. */
+Prints info of locks for all transactions.
+@return FALSE if not able to obtain kernel mutex
+and exits without printing info */
UNIV_INTERN
-void
+ibool
lock_print_info_summary(
/*====================*/
- FILE* file) /*!< in: file where to print */
+ FILE* file, /*!< in: file where to print */
+ ibool nowait) /*!< in: whether to wait for the kernel mutex */
{
- lock_mutex_enter_kernel();
+ /* if nowait is FALSE, wait on the kernel mutex,
+ otherwise return immediately if fail to obtain the
+ mutex. */
+ if (!nowait) {
+ lock_mutex_enter_kernel();
+ } else if (mutex_enter_nowait(&kernel_mutex)) {
+ fputs("FAIL TO OBTAIN KERNEL MUTEX, "
+ "SKIP LOCK INFO PRINTING\n", file);
+ return(FALSE);
+ }
if (lock_deadlock_found) {
fputs("------------------------\n"
@@ -4368,6 +4444,7 @@ lock_print_info_summary(
"Total number of lock structs in row lock hash table %lu\n",
(ulong) lock_get_n_rec_locks());
#endif /* PRINT_NUM_OF_LOCK_STRUCTS */
+ return(TRUE);
}
/*********************************************************************//**
@@ -4648,6 +4725,7 @@ lock_rec_queue_validate(
ut_a(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
block, heap_no, impl_trx));
}
+#if 0
} else {
/* The kernel mutex may get released temporarily in the
@@ -4658,6 +4736,27 @@ lock_rec_queue_validate(
(fil_space_t::latch), the following check WILL break
latching order and may cause a deadlock of threads. */
+ /* NOTE: This is a bogus check that would fail in the
+ following case: Our transaction is updating a
+ row. After it has updated the clustered index record,
+ it goes to a secondary index record and finds someone
+ else holding an explicit S- or X-lock on that
+ secondary index record, presumably from a locking
+ read. Our transaction cannot update the secondary
+ index immediately, but places a waiting X-lock request
+ on the secondary index record. There is nothing
+ illegal in this. The assertion is simply too strong. */
+
+ /* From the locking point of view, each secondary
+ index is a separate table. A lock that is held on
+ secondary index rec does not give any rights to modify
+ or read the clustered index rec. Therefore, we can
+ think of the sec index as a separate 'table' from the
+ clust index 'table'. Conversely, a transaction that
+ has acquired a lock on and modified a clustered index
+ record may need to wait for a lock on the
+ corresponding record in a secondary index. */
+
impl_trx = lock_sec_rec_some_has_impl_off_kernel(
rec, index, offsets);
@@ -4668,6 +4767,7 @@ lock_rec_queue_validate(
ut_a(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
block, heap_no, impl_trx));
}
+#endif
}
lock = lock_rec_get_first(block, heap_no);
@@ -4765,6 +4865,13 @@ loop:
|| lock->trx->conc_state == TRX_PREPARED
|| lock->trx->conc_state == TRX_COMMITTED_IN_MEMORY);
+# ifdef UNIV_SYNC_DEBUG
+ /* Only validate the record queues when this thread is not
+ holding a space->latch. Deadlocks are possible due to
+ latching order violation when UNIV_DEBUG is defined while
+ UNIV_SYNC_DEBUG is not. */
+ if (!sync_thread_levels_contains(SYNC_FSP))
+# endif /* UNIV_SYNC_DEBUG */
for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) {
if (i == 1 || lock_rec_get_nth_bit(lock, i)) {
@@ -4930,7 +5037,7 @@ lock_rec_insert_check_and_lock(
}
trx = thr_get_trx(thr);
- next_rec = page_rec_get_next((rec_t*) rec);
+ next_rec = page_rec_get_next_const(rec);
next_rec_heap_no = page_rec_get_heap_no(next_rec);
lock_mutex_enter_kernel();
@@ -4988,7 +5095,14 @@ lock_rec_insert_check_and_lock(
lock_mutex_exit_kernel();
- if ((err == DB_SUCCESS) && !dict_index_is_clust(index)) {
+ switch (err) {
+ case DB_SUCCESS_LOCKED_REC:
+ err = DB_SUCCESS;
+ /* fall through */
+ case DB_SUCCESS:
+ if (dict_index_is_clust(index)) {
+ break;
+ }
/* Update the page max trx id field */
page_update_max_trx_id(block,
buf_block_get_page_zip(block),
@@ -5111,6 +5225,10 @@ lock_clust_rec_modify_check_and_lock(
ut_ad(lock_rec_queue_validate(block, rec, index, offsets));
+ if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
+ err = DB_SUCCESS;
+ }
+
return(err);
}
@@ -5177,22 +5295,27 @@ lock_sec_rec_modify_check_and_lock(
}
#endif /* UNIV_DEBUG */
- if (err == DB_SUCCESS) {
+ if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
/* Update the page max trx id field */
+ /* It might not be necessary to do this if
+ err == DB_SUCCESS (no new lock created),
+ but it should not cost too much performance. */
page_update_max_trx_id(block,
buf_block_get_page_zip(block),
thr_get_trx(thr)->id, mtr);
+ err = DB_SUCCESS;
}
return(err);
}
/*********************************************************************//**
-Like the counterpart for a clustered index below, but now we read a
+Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record.
-@return DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
UNIV_INTERN
-ulint
+enum db_err
lock_sec_rec_read_check_and_lock(
/*=============================*/
ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
@@ -5213,8 +5336,8 @@ lock_sec_rec_read_check_and_lock(
LOCK_REC_NOT_GAP */
que_thr_t* thr) /*!< in: query thread */
{
- ulint err;
- ulint heap_no;
+ enum db_err err;
+ ulint heap_no;
ut_ad(!dict_index_is_clust(index));
ut_ad(block->frame == page_align(rec));
@@ -5265,9 +5388,10 @@ if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record.
-@return DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
+or DB_QUE_THR_SUSPENDED */
UNIV_INTERN
-ulint
+enum db_err
lock_clust_rec_read_check_and_lock(
/*===============================*/
ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
@@ -5288,8 +5412,8 @@ lock_clust_rec_read_check_and_lock(
LOCK_REC_NOT_GAP */
que_thr_t* thr) /*!< in: query thread */
{
- ulint err;
- ulint heap_no;
+ enum db_err err;
+ ulint heap_no;
ut_ad(dict_index_is_clust(index));
ut_ad(block->frame == page_align(rec));
@@ -5360,17 +5484,22 @@ lock_clust_rec_read_check_and_lock_alt(
mem_heap_t* tmp_heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
- ulint ret;
+ ulint err;
rec_offs_init(offsets_);
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &tmp_heap);
- ret = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
+ err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
offsets, mode, gap_mode, thr);
if (tmp_heap) {
mem_heap_free(tmp_heap);
}
- return(ret);
+
+ if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
+ err = DB_SUCCESS;
+ }
+
+ return(err);
}
/*******************************************************************//**