diff options
Diffstat (limited to 'storage/xtradb/lock/lock0lock.c')
-rw-r--r-- | storage/xtradb/lock/lock0lock.c | 445 |
1 files changed, 287 insertions, 158 deletions
diff --git a/storage/xtradb/lock/lock0lock.c b/storage/xtradb/lock/lock0lock.c index 59394f13766..7ec4a53e0ea 100644 --- a/storage/xtradb/lock/lock0lock.c +++ b/storage/xtradb/lock/lock0lock.c @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 1996, 2009, Innobase Oy. All Rights Reserved. +Copyright (c) 1996, 2010, Innobase Oy. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -376,6 +376,7 @@ UNIV_INTERN FILE* lock_latest_err_file; /* Flags for recursive deadlock search */ #define LOCK_VICTIM_IS_START 1 #define LOCK_VICTIM_IS_OTHER 2 +#define LOCK_EXCEED_MAX_DEPTH 3 /********************************************************************//** Checks if a lock request results in a deadlock. @@ -394,24 +395,25 @@ Looks recursively for a deadlock. deadlock and we chose 'start' as the victim, LOCK_VICTIM_IS_OTHER if a deadlock was found and we chose some other trx as a victim: we must do the search again in this last case because there may be another -deadlock! */ +deadlock! +LOCK_EXCEED_MAX_DEPTH if the lock search exceeds max steps or max depth. */ static ulint lock_deadlock_recursive( /*====================*/ trx_t* start, /*!< in: recursion starting point */ trx_t* trx, /*!< in: a transaction waiting for a lock */ - lock_t* wait_lock, /*!< in: the lock trx is waiting to be granted */ + lock_t* wait_lock, /*!< in: lock that is waiting to be granted */ ulint* cost, /*!< in/out: number of calculation steps thus far: if this exceeds LOCK_MAX_N_STEPS_... - we return LOCK_VICTIM_IS_START */ + we return LOCK_EXCEED_MAX_DEPTH */ ulint depth); /*!< in: recursion depth: if this exceeds LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we - return LOCK_VICTIM_IS_START */ + return LOCK_EXCEED_MAX_DEPTH */ /*********************************************************************//** Gets the nth bit of a record lock. -@return TRUE if bit set */ +@return TRUE if bit set also if i == ULINT_UNDEFINED return FALSE*/ UNIV_INLINE ibool lock_rec_get_nth_bit( @@ -1222,7 +1224,7 @@ lock_rec_get_first_on_page( /*********************************************************************//** Gets the next explicit lock request on a record. -@return next lock, NULL if none exists */ +@return next lock, NULL if none exists or if heap_no == ULINT_UNDEFINED */ UNIV_INLINE lock_t* lock_rec_get_next( @@ -1731,11 +1733,11 @@ lock_rec_create( Enqueues a waiting request for a lock which cannot be granted immediately. Checks for deadlocks. @return DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED, or -DB_SUCCESS; DB_SUCCESS means that there was a deadlock, but another -transaction was chosen as a victim, and we got the lock immediately: -no need to wait then */ +DB_SUCCESS_LOCKED_REC; DB_SUCCESS_LOCKED_REC means that +there was a deadlock, but another transaction was chosen as a victim, +and we got the lock immediately: no need to wait then */ static -ulint +enum db_err lock_rec_enqueue_waiting( /*=====================*/ ulint type_mode,/*!< in: lock mode this @@ -1809,7 +1811,7 @@ lock_rec_enqueue_waiting( if (trx->wait_lock == NULL) { - return(DB_SUCCESS); + return(DB_SUCCESS_LOCKED_REC); } trx->que_state = TRX_QUE_LOCK_WAIT; @@ -1929,6 +1931,16 @@ somebody_waits: return(lock_rec_create(type_mode, block, heap_no, index, trx)); } +/** Record locking request status */ +enum lock_rec_req_status { + /** Failed to acquire a lock */ + LOCK_REC_FAIL, + /** Succeeded in acquiring a lock (implicit or already acquired) */ + LOCK_REC_SUCCESS, + /** Explicitly created a new lock */ + LOCK_REC_SUCCESS_CREATED +}; + /*********************************************************************//** This is a fast routine for locking a record in the most common cases: there are no explicit locks on the page, or there is just one lock, owned @@ -1936,9 +1948,9 @@ by this transaction, and of the right type_mode. This is a low-level function which does NOT look at implicit locks! Checks lock compatibility within explicit locks. This function sets a normal next-key lock, or in the case of a page supremum record, a gap type lock. -@return TRUE if locking succeeded */ +@return whether the locking succeeded */ UNIV_INLINE -ibool +enum lock_rec_req_status lock_rec_lock_fast( /*===============*/ ibool impl, /*!< in: if TRUE, no lock is set @@ -1977,19 +1989,19 @@ lock_rec_lock_fast( lock_rec_create(mode, block, heap_no, index, trx); } - return(TRUE); + return(LOCK_REC_SUCCESS_CREATED); } if (lock_rec_get_next_on_page(lock)) { - return(FALSE); + return(LOCK_REC_FAIL); } if (lock->trx != trx || lock->type_mode != (mode | LOCK_REC) || lock_rec_get_n_bits(lock) <= heap_no) { - return(FALSE); + return(LOCK_REC_FAIL); } if (!impl) { @@ -1998,10 +2010,11 @@ lock_rec_lock_fast( if (!lock_rec_get_nth_bit(lock, heap_no)) { lock_rec_set_nth_bit(lock, heap_no); + return(LOCK_REC_SUCCESS_CREATED); } } - return(TRUE); + return(LOCK_REC_SUCCESS); } /*********************************************************************//** @@ -2009,9 +2022,10 @@ This is the general, and slower, routine for locking a record. This is a low-level function which does NOT look at implicit locks! Checks lock compatibility within explicit locks. This function sets a normal next-key lock, or in the case of a page supremum record, a gap type lock. -@return DB_SUCCESS, DB_LOCK_WAIT, or error code */ +@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK, +or DB_QUE_THR_SUSPENDED */ static -ulint +enum db_err lock_rec_lock_slow( /*===============*/ ibool impl, /*!< in: if TRUE, no lock is set @@ -2028,7 +2042,6 @@ lock_rec_lock_slow( que_thr_t* thr) /*!< in: query thread */ { trx_t* trx; - ulint err; ut_ad(mutex_own(&kernel_mutex)); ut_ad((LOCK_MODE_MASK & mode) != LOCK_S @@ -2047,27 +2060,23 @@ lock_rec_lock_slow( /* The trx already has a strong enough lock on rec: do nothing */ - err = DB_SUCCESS; } else if (lock_rec_other_has_conflicting(mode, block, heap_no, trx)) { /* If another transaction has a non-gap conflicting request in the queue, as this transaction does not have a lock strong enough already granted on the record, we have to wait. */ - err = lock_rec_enqueue_waiting(mode, block, heap_no, - index, thr); - } else { - if (!impl) { - /* Set the requested lock on the record */ - - lock_rec_add_to_queue(LOCK_REC | mode, block, - heap_no, index, trx); - } + return(lock_rec_enqueue_waiting(mode, block, heap_no, + index, thr)); + } else if (!impl) { + /* Set the requested lock on the record */ - err = DB_SUCCESS; + lock_rec_add_to_queue(LOCK_REC | mode, block, + heap_no, index, trx); + return(DB_SUCCESS_LOCKED_REC); } - return(err); + return(DB_SUCCESS); } /*********************************************************************//** @@ -2076,9 +2085,10 @@ possible, enqueues a waiting lock request. This is a low-level function which does NOT look at implicit locks! Checks lock compatibility within explicit locks. This function sets a normal next-key lock, or in the case of a page supremum record, a gap type lock. -@return DB_SUCCESS, DB_LOCK_WAIT, or error code */ +@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK, +or DB_QUE_THR_SUSPENDED */ static -ulint +enum db_err lock_rec_lock( /*==========*/ ibool impl, /*!< in: if TRUE, no lock is set @@ -2094,8 +2104,6 @@ lock_rec_lock( dict_index_t* index, /*!< in: index of record */ que_thr_t* thr) /*!< in: query thread */ { - ulint err; - ut_ad(mutex_own(&kernel_mutex)); ut_ad((LOCK_MODE_MASK & mode) != LOCK_S || lock_table_has(thr_get_trx(thr), index->table, LOCK_IS)); @@ -2107,18 +2115,20 @@ lock_rec_lock( || mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP || mode - (LOCK_MODE_MASK & mode) == 0); - if (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) { - - /* We try a simplified and faster subroutine for the most - common cases */ - - err = DB_SUCCESS; - } else { - err = lock_rec_lock_slow(impl, mode, block, - heap_no, index, thr); + /* We try a simplified and faster subroutine for the most + common cases */ + switch (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) { + case LOCK_REC_SUCCESS: + return(DB_SUCCESS); + case LOCK_REC_SUCCESS_CREATED: + return(DB_SUCCESS_LOCKED_REC); + case LOCK_REC_FAIL: + return(lock_rec_lock_slow(impl, mode, block, + heap_no, index, thr)); } - return(err); + ut_error; + return(DB_ERROR); } /*********************************************************************//** @@ -2404,7 +2414,7 @@ lock_rec_inherit_to_gap( if (!lock_rec_get_insert_intention(lock) && !((srv_locks_unsafe_for_binlog || lock->trx->isolation_level - == TRX_ISO_READ_COMMITTED) + <= TRX_ISO_READ_COMMITTED) && lock_get_mode(lock) == LOCK_X)) { lock_rec_add_to_queue(LOCK_REC | LOCK_GAP @@ -3267,8 +3277,6 @@ lock_deadlock_occurs( lock_t* lock, /*!< in: lock the transaction is requesting */ trx_t* trx) /*!< in: transaction */ { - dict_table_t* table; - dict_index_t* index; trx_t* mark_trx; ulint ret; ulint cost = 0; @@ -3290,31 +3298,51 @@ retry: ret = lock_deadlock_recursive(trx, trx, lock, &cost, 0); - if (ret == LOCK_VICTIM_IS_OTHER) { + switch (ret) { + case LOCK_VICTIM_IS_OTHER: /* We chose some other trx as a victim: retry if there still is a deadlock */ - goto retry; - } - if (UNIV_UNLIKELY(ret == LOCK_VICTIM_IS_START)) { - if (lock_get_type_low(lock) & LOCK_TABLE) { - table = lock->un_member.tab_lock.table; - index = NULL; + case LOCK_EXCEED_MAX_DEPTH: + /* If the lock search exceeds the max step + or the max depth, the current trx will be + the victim. Print its information. */ + rewind(lock_latest_err_file); + ut_print_timestamp(lock_latest_err_file); + + fputs("TOO DEEP OR LONG SEARCH IN THE LOCK TABLE" + " WAITS-FOR GRAPH, WE WILL ROLL BACK" + " FOLLOWING TRANSACTION \n", + lock_latest_err_file); + + fputs("\n*** TRANSACTION:\n", lock_latest_err_file); + trx_print(lock_latest_err_file, trx, 3000); + + fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n", + lock_latest_err_file); + + if (lock_get_type(lock) == LOCK_REC) { + lock_rec_print(lock_latest_err_file, lock); } else { - index = lock->index; - table = index->table; + lock_table_print(lock_latest_err_file, lock); } + break; - lock_deadlock_found = TRUE; - + case LOCK_VICTIM_IS_START: + srv_n_lock_deadlock_count++; fputs("*** WE ROLL BACK TRANSACTION (2)\n", lock_latest_err_file); + break; - return(TRUE); + default: + /* No deadlock detected*/ + return(FALSE); } - return(FALSE); + lock_deadlock_found = TRUE; + + return(TRUE); } /********************************************************************//** @@ -3323,25 +3351,26 @@ Looks recursively for a deadlock. deadlock and we chose 'start' as the victim, LOCK_VICTIM_IS_OTHER if a deadlock was found and we chose some other trx as a victim: we must do the search again in this last case because there may be another -deadlock! */ +deadlock! +LOCK_EXCEED_MAX_DEPTH if the lock search exceeds max steps or max depth. */ static ulint lock_deadlock_recursive( /*====================*/ trx_t* start, /*!< in: recursion starting point */ trx_t* trx, /*!< in: a transaction waiting for a lock */ - lock_t* wait_lock, /*!< in: the lock trx is waiting to be granted */ + lock_t* wait_lock, /*!< in: lock that is waiting to be granted */ ulint* cost, /*!< in/out: number of calculation steps thus far: if this exceeds LOCK_MAX_N_STEPS_... - we return LOCK_VICTIM_IS_START */ + we return LOCK_EXCEED_MAX_DEPTH */ ulint depth) /*!< in: recursion depth: if this exceeds LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we - return LOCK_VICTIM_IS_START */ + return LOCK_EXCEED_MAX_DEPTH */ { + ulint ret; lock_t* lock; - ulint bit_no = ULINT_UNDEFINED; trx_t* lock_trx; - ulint ret; + ulint heap_no = ULINT_UNDEFINED; ut_a(trx); ut_a(start); @@ -3357,27 +3386,44 @@ lock_deadlock_recursive( *cost = *cost + 1; - lock = wait_lock; - if (lock_get_type_low(wait_lock) == LOCK_REC) { + ulint space; + ulint page_no; - bit_no = lock_rec_find_set_bit(wait_lock); + heap_no = lock_rec_find_set_bit(wait_lock); + ut_a(heap_no != ULINT_UNDEFINED); + + space = wait_lock->un_member.rec_lock.space; + page_no = wait_lock->un_member.rec_lock.page_no; + + lock = lock_rec_get_first_on_page_addr(space, page_no); + + /* Position the iterator on the first matching record lock. */ + while (lock != NULL + && lock != wait_lock + && !lock_rec_get_nth_bit(lock, heap_no)) { + + lock = lock_rec_get_next_on_page(lock); + } - ut_a(bit_no != ULINT_UNDEFINED); + if (lock == wait_lock) { + lock = NULL; + } + + ut_ad(lock == NULL || lock_rec_get_nth_bit(lock, heap_no)); + + } else { + lock = wait_lock; } /* Look at the locks ahead of wait_lock in the lock queue */ for (;;) { - if (lock_get_type_low(lock) & LOCK_TABLE) { - - lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, - lock); - } else { - ut_ad(lock_get_type_low(lock) == LOCK_REC); - ut_a(bit_no != ULINT_UNDEFINED); + /* Get previous table lock. */ + if (heap_no == ULINT_UNDEFINED) { - lock = (lock_t*) lock_rec_get_prev(lock, bit_no); + lock = UT_LIST_GET_PREV( + un_member.tab_lock.locks, lock); } if (lock == NULL) { @@ -3395,7 +3441,7 @@ lock_deadlock_recursive( lock_trx = lock->trx; - if (lock_trx == start || too_far) { + if (lock_trx == start) { /* We came back to the recursion starting point: a deadlock detected; or we have @@ -3442,19 +3488,10 @@ lock_deadlock_recursive( } #ifdef UNIV_DEBUG if (lock_print_waits) { - fputs("Deadlock detected" - " or too long search\n", + fputs("Deadlock detected\n", stderr); } #endif /* UNIV_DEBUG */ - if (too_far) { - - fputs("TOO DEEP OR LONG SEARCH" - " IN THE LOCK TABLE" - " WAITS-FOR GRAPH\n", ef); - - return(LOCK_VICTIM_IS_START); - } if (trx_weight_cmp(wait_lock->trx, start) >= 0) { @@ -3490,6 +3527,21 @@ lock_deadlock_recursive( return(LOCK_VICTIM_IS_OTHER); } + if (too_far) { + +#ifdef UNIV_DEBUG + if (lock_print_waits) { + fputs("Deadlock search exceeds" + " max steps or depth.\n", + stderr); + } +#endif /* UNIV_DEBUG */ + /* The information about transaction/lock + to be rolled back is available in the top + level. Do not print anything here. */ + return(LOCK_EXCEED_MAX_DEPTH); + } + if (lock_trx->que_state == TRX_QUE_LOCK_WAIT) { /* Another trx ahead has requested lock in an @@ -3499,12 +3551,28 @@ lock_deadlock_recursive( ret = lock_deadlock_recursive( start, lock_trx, lock_trx->wait_lock, cost, depth + 1); + if (ret != 0) { return(ret); } } } + /* Get the next record lock to check. */ + if (heap_no != ULINT_UNDEFINED) { + + ut_a(lock != NULL); + + do { + lock = lock_rec_get_next_on_page(lock); + } while (lock != NULL + && lock != wait_lock + && !lock_rec_get_nth_bit(lock, heap_no)); + + if (lock == wait_lock) { + lock = NULL; + } + } }/* end of the 'for (;;)'-loop */ } @@ -3706,9 +3774,10 @@ lock_table_enqueue_waiting( /*********************************************************************//** Checks if other transactions have an incompatible mode lock request in -the lock queue. */ +the lock queue. +@return lock or NULL */ UNIV_INLINE -ibool +lock_t* lock_table_other_has_incompatible( /*==============================*/ trx_t* trx, /*!< in: transaction, or NULL if all @@ -3730,13 +3799,13 @@ lock_table_other_has_incompatible( && (!lock_mode_compatible(lock_get_mode(lock), mode)) && (wait || !(lock_get_wait(lock)))) { - return(TRUE); + return(lock); } lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock); } - return(FALSE); + return(NULL); } /*********************************************************************//** @@ -3887,8 +3956,8 @@ lock_rec_unlock( const rec_t* rec, /*!< in: record */ enum lock_mode lock_mode)/*!< in: LOCK_S or LOCK_X */ { + lock_t* first_lock; lock_t* lock; - lock_t* release_lock = NULL; ulint heap_no; ut_ad(trx && rec); @@ -3898,48 +3967,40 @@ lock_rec_unlock( mutex_enter(&kernel_mutex); - lock = lock_rec_get_first(block, heap_no); + first_lock = lock_rec_get_first(block, heap_no); /* Find the last lock with the same lock_mode and transaction from the record. */ - while (lock != NULL) { + for (lock = first_lock; lock != NULL; + lock = lock_rec_get_next(heap_no, lock)) { if (lock->trx == trx && lock_get_mode(lock) == lock_mode) { - release_lock = lock; ut_a(!lock_get_wait(lock)); + lock_rec_reset_nth_bit(lock, heap_no); + goto released; } - - lock = lock_rec_get_next(heap_no, lock); } - /* If a record lock is found, release the record lock */ - - if (UNIV_LIKELY(release_lock != NULL)) { - lock_rec_reset_nth_bit(release_lock, heap_no); - } else { - mutex_exit(&kernel_mutex); - ut_print_timestamp(stderr); - fprintf(stderr, - " InnoDB: Error: unlock row could not" - " find a %lu mode lock on the record\n", - (ulong) lock_mode); + mutex_exit(&kernel_mutex); + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: Error: unlock row could not" + " find a %lu mode lock on the record\n", + (ulong) lock_mode); - return; - } + return; +released: /* Check if we can now grant waiting lock requests */ - lock = lock_rec_get_first(block, heap_no); - - while (lock != NULL) { + for (lock = first_lock; lock != NULL; + lock = lock_rec_get_next(heap_no, lock)) { if (lock_get_wait(lock) && !lock_rec_has_to_wait_in_queue(lock)) { /* Grant the lock */ lock_grant(lock); } - - lock = lock_rec_get_next(heap_no, lock); } mutex_exit(&kernel_mutex); @@ -4260,31 +4321,34 @@ lock_rec_print( putc('\n', file); if ( srv_show_verbose_locks ) { - block = buf_page_try_get(space, page_no, &mtr); + block = buf_page_try_get(space, page_no, &mtr); + + for (i = 0; i < lock_rec_get_n_bits(lock); ++i) { + + if (!lock_rec_get_nth_bit(lock, i)) { + continue; + } + + fprintf(file, "Record lock, heap no %lu", (ulong) i); + if (block) { - for (i = 0; i < lock_rec_get_n_bits(lock); i++) { + const rec_t* rec; - if (lock_rec_get_nth_bit(lock, i)) { + rec = page_find_rec_with_heap_no( + buf_block_get_frame(block), i); - const rec_t* rec - = page_find_rec_with_heap_no( - buf_block_get_frame(block), i); - offsets = rec_get_offsets( - rec, lock->index, offsets, - ULINT_UNDEFINED, &heap); + offsets = rec_get_offsets( + rec, lock->index, offsets, + ULINT_UNDEFINED, &heap); - fprintf(file, "Record lock, heap no %lu ", - (ulong) i); - rec_print_new(file, rec, offsets); - putc('\n', file); - } - } - } else { - for (i = 0; i < lock_rec_get_n_bits(lock); i++) { - fprintf(file, "Record lock, heap no %lu\n", (ulong) i); - } + putc(' ', file); + rec_print_new(file, rec, offsets); } + + putc('\n', file); } + } + mtr_commit(&mtr); if (UNIV_LIKELY_NULL(heap)) { mem_heap_free(heap); @@ -4329,14 +4393,26 @@ lock_get_n_rec_locks(void) #endif /* PRINT_NUM_OF_LOCK_STRUCTS */ /*********************************************************************//** -Prints info of locks for all transactions. */ +Prints info of locks for all transactions. +@return FALSE if not able to obtain kernel mutex +and exits without printing info */ UNIV_INTERN -void +ibool lock_print_info_summary( /*====================*/ - FILE* file) /*!< in: file where to print */ + FILE* file, /*!< in: file where to print */ + ibool nowait) /*!< in: whether to wait for the kernel mutex */ { - lock_mutex_enter_kernel(); + /* if nowait is FALSE, wait on the kernel mutex, + otherwise return immediately if fail to obtain the + mutex. */ + if (!nowait) { + lock_mutex_enter_kernel(); + } else if (mutex_enter_nowait(&kernel_mutex)) { + fputs("FAIL TO OBTAIN KERNEL MUTEX, " + "SKIP LOCK INFO PRINTING\n", file); + return(FALSE); + } if (lock_deadlock_found) { fputs("------------------------\n" @@ -4368,6 +4444,7 @@ lock_print_info_summary( "Total number of lock structs in row lock hash table %lu\n", (ulong) lock_get_n_rec_locks()); #endif /* PRINT_NUM_OF_LOCK_STRUCTS */ + return(TRUE); } /*********************************************************************//** @@ -4648,6 +4725,7 @@ lock_rec_queue_validate( ut_a(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no, impl_trx)); } +#if 0 } else { /* The kernel mutex may get released temporarily in the @@ -4658,6 +4736,27 @@ lock_rec_queue_validate( (fil_space_t::latch), the following check WILL break latching order and may cause a deadlock of threads. */ + /* NOTE: This is a bogus check that would fail in the + following case: Our transaction is updating a + row. After it has updated the clustered index record, + it goes to a secondary index record and finds someone + else holding an explicit S- or X-lock on that + secondary index record, presumably from a locking + read. Our transaction cannot update the secondary + index immediately, but places a waiting X-lock request + on the secondary index record. There is nothing + illegal in this. The assertion is simply too strong. */ + + /* From the locking point of view, each secondary + index is a separate table. A lock that is held on + secondary index rec does not give any rights to modify + or read the clustered index rec. Therefore, we can + think of the sec index as a separate 'table' from the + clust index 'table'. Conversely, a transaction that + has acquired a lock on and modified a clustered index + record may need to wait for a lock on the + corresponding record in a secondary index. */ + impl_trx = lock_sec_rec_some_has_impl_off_kernel( rec, index, offsets); @@ -4668,6 +4767,7 @@ lock_rec_queue_validate( ut_a(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no, impl_trx)); } +#endif } lock = lock_rec_get_first(block, heap_no); @@ -4765,6 +4865,13 @@ loop: || lock->trx->conc_state == TRX_PREPARED || lock->trx->conc_state == TRX_COMMITTED_IN_MEMORY); +# ifdef UNIV_SYNC_DEBUG + /* Only validate the record queues when this thread is not + holding a space->latch. Deadlocks are possible due to + latching order violation when UNIV_DEBUG is defined while + UNIV_SYNC_DEBUG is not. */ + if (!sync_thread_levels_contains(SYNC_FSP)) +# endif /* UNIV_SYNC_DEBUG */ for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) { if (i == 1 || lock_rec_get_nth_bit(lock, i)) { @@ -4930,7 +5037,7 @@ lock_rec_insert_check_and_lock( } trx = thr_get_trx(thr); - next_rec = page_rec_get_next((rec_t*) rec); + next_rec = page_rec_get_next_const(rec); next_rec_heap_no = page_rec_get_heap_no(next_rec); lock_mutex_enter_kernel(); @@ -4988,7 +5095,14 @@ lock_rec_insert_check_and_lock( lock_mutex_exit_kernel(); - if ((err == DB_SUCCESS) && !dict_index_is_clust(index)) { + switch (err) { + case DB_SUCCESS_LOCKED_REC: + err = DB_SUCCESS; + /* fall through */ + case DB_SUCCESS: + if (dict_index_is_clust(index)) { + break; + } /* Update the page max trx id field */ page_update_max_trx_id(block, buf_block_get_page_zip(block), @@ -5111,6 +5225,10 @@ lock_clust_rec_modify_check_and_lock( ut_ad(lock_rec_queue_validate(block, rec, index, offsets)); + if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) { + err = DB_SUCCESS; + } + return(err); } @@ -5177,22 +5295,27 @@ lock_sec_rec_modify_check_and_lock( } #endif /* UNIV_DEBUG */ - if (err == DB_SUCCESS) { + if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) { /* Update the page max trx id field */ + /* It might not be necessary to do this if + err == DB_SUCCESS (no new lock created), + but it should not cost too much performance. */ page_update_max_trx_id(block, buf_block_get_page_zip(block), thr_get_trx(thr)->id, mtr); + err = DB_SUCCESS; } return(err); } /*********************************************************************//** -Like the counterpart for a clustered index below, but now we read a +Like lock_clust_rec_read_check_and_lock(), but reads a secondary index record. -@return DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ +@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK, +or DB_QUE_THR_SUSPENDED */ UNIV_INTERN -ulint +enum db_err lock_sec_rec_read_check_and_lock( /*=============================*/ ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG @@ -5213,8 +5336,8 @@ lock_sec_rec_read_check_and_lock( LOCK_REC_NOT_GAP */ que_thr_t* thr) /*!< in: query thread */ { - ulint err; - ulint heap_no; + enum db_err err; + ulint heap_no; ut_ad(!dict_index_is_clust(index)); ut_ad(block->frame == page_align(rec)); @@ -5265,9 +5388,10 @@ if the query thread should anyway be suspended for some reason; if not, then puts the transaction and the query thread to the lock wait state and inserts a waiting request for a record lock to the lock queue. Sets the requested mode lock on the record. -@return DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ +@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK, +or DB_QUE_THR_SUSPENDED */ UNIV_INTERN -ulint +enum db_err lock_clust_rec_read_check_and_lock( /*===============================*/ ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG @@ -5288,8 +5412,8 @@ lock_clust_rec_read_check_and_lock( LOCK_REC_NOT_GAP */ que_thr_t* thr) /*!< in: query thread */ { - ulint err; - ulint heap_no; + enum db_err err; + ulint heap_no; ut_ad(dict_index_is_clust(index)); ut_ad(block->frame == page_align(rec)); @@ -5360,17 +5484,22 @@ lock_clust_rec_read_check_and_lock_alt( mem_heap_t* tmp_heap = NULL; ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint* offsets = offsets_; - ulint ret; + ulint err; rec_offs_init(offsets_); offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &tmp_heap); - ret = lock_clust_rec_read_check_and_lock(flags, block, rec, index, + err = lock_clust_rec_read_check_and_lock(flags, block, rec, index, offsets, mode, gap_mode, thr); if (tmp_heap) { mem_heap_free(tmp_heap); } - return(ret); + + if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) { + err = DB_SUCCESS; + } + + return(err); } /*******************************************************************//** |