diff options
Diffstat (limited to 'storage/xtradb/lock/lock0lock.c')
-rw-r--r-- | storage/xtradb/lock/lock0lock.c | 267 |
1 files changed, 187 insertions, 80 deletions
diff --git a/storage/xtradb/lock/lock0lock.c b/storage/xtradb/lock/lock0lock.c index 59394f13766..b103ee79578 100644 --- a/storage/xtradb/lock/lock0lock.c +++ b/storage/xtradb/lock/lock0lock.c @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 1996, 2009, Innobase Oy. All Rights Reserved. +Copyright (c) 1996, 2010, Innobase Oy. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -376,6 +376,7 @@ UNIV_INTERN FILE* lock_latest_err_file; /* Flags for recursive deadlock search */ #define LOCK_VICTIM_IS_START 1 #define LOCK_VICTIM_IS_OTHER 2 +#define LOCK_EXCEED_MAX_DEPTH 3 /********************************************************************//** Checks if a lock request results in a deadlock. @@ -394,24 +395,25 @@ Looks recursively for a deadlock. deadlock and we chose 'start' as the victim, LOCK_VICTIM_IS_OTHER if a deadlock was found and we chose some other trx as a victim: we must do the search again in this last case because there may be another -deadlock! */ +deadlock! +LOCK_EXCEED_MAX_DEPTH if the lock search exceeds max steps or max depth. */ static ulint lock_deadlock_recursive( /*====================*/ trx_t* start, /*!< in: recursion starting point */ trx_t* trx, /*!< in: a transaction waiting for a lock */ - lock_t* wait_lock, /*!< in: the lock trx is waiting to be granted */ + lock_t* wait_lock, /*!< in: lock that is waiting to be granted */ ulint* cost, /*!< in/out: number of calculation steps thus far: if this exceeds LOCK_MAX_N_STEPS_... - we return LOCK_VICTIM_IS_START */ + we return LOCK_EXCEED_MAX_DEPTH */ ulint depth); /*!< in: recursion depth: if this exceeds LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we - return LOCK_VICTIM_IS_START */ + return LOCK_EXCEED_MAX_DEPTH */ /*********************************************************************//** Gets the nth bit of a record lock. -@return TRUE if bit set */ +@return TRUE if bit set also if i == ULINT_UNDEFINED return FALSE*/ UNIV_INLINE ibool lock_rec_get_nth_bit( @@ -1222,7 +1224,7 @@ lock_rec_get_first_on_page( /*********************************************************************//** Gets the next explicit lock request on a record. -@return next lock, NULL if none exists */ +@return next lock, NULL if none exists or if heap_no == ULINT_UNDEFINED */ UNIV_INLINE lock_t* lock_rec_get_next( @@ -2404,7 +2406,7 @@ lock_rec_inherit_to_gap( if (!lock_rec_get_insert_intention(lock) && !((srv_locks_unsafe_for_binlog || lock->trx->isolation_level - == TRX_ISO_READ_COMMITTED) + <= TRX_ISO_READ_COMMITTED) && lock_get_mode(lock) == LOCK_X)) { lock_rec_add_to_queue(LOCK_REC | LOCK_GAP @@ -3267,8 +3269,6 @@ lock_deadlock_occurs( lock_t* lock, /*!< in: lock the transaction is requesting */ trx_t* trx) /*!< in: transaction */ { - dict_table_t* table; - dict_index_t* index; trx_t* mark_trx; ulint ret; ulint cost = 0; @@ -3290,31 +3290,51 @@ retry: ret = lock_deadlock_recursive(trx, trx, lock, &cost, 0); - if (ret == LOCK_VICTIM_IS_OTHER) { + switch (ret) { + case LOCK_VICTIM_IS_OTHER: /* We chose some other trx as a victim: retry if there still is a deadlock */ - goto retry; - } - if (UNIV_UNLIKELY(ret == LOCK_VICTIM_IS_START)) { - if (lock_get_type_low(lock) & LOCK_TABLE) { - table = lock->un_member.tab_lock.table; - index = NULL; + case LOCK_EXCEED_MAX_DEPTH: + /* If the lock search exceeds the max step + or the max depth, the current trx will be + the victim. Print its information. */ + rewind(lock_latest_err_file); + ut_print_timestamp(lock_latest_err_file); + + fputs("TOO DEEP OR LONG SEARCH IN THE LOCK TABLE" + " WAITS-FOR GRAPH, WE WILL ROLL BACK" + " FOLLOWING TRANSACTION \n", + lock_latest_err_file); + + fputs("\n*** TRANSACTION:\n", lock_latest_err_file); + trx_print(lock_latest_err_file, trx, 3000); + + fputs("*** WAITING FOR THIS LOCK TO BE GRANTED:\n", + lock_latest_err_file); + + if (lock_get_type(lock) == LOCK_REC) { + lock_rec_print(lock_latest_err_file, lock); } else { - index = lock->index; - table = index->table; + lock_table_print(lock_latest_err_file, lock); } + break; - lock_deadlock_found = TRUE; - + case LOCK_VICTIM_IS_START: + srv_n_lock_deadlock_count++; fputs("*** WE ROLL BACK TRANSACTION (2)\n", lock_latest_err_file); + break; - return(TRUE); + default: + /* No deadlock detected*/ + return(FALSE); } - return(FALSE); + lock_deadlock_found = TRUE; + + return(TRUE); } /********************************************************************//** @@ -3323,25 +3343,26 @@ Looks recursively for a deadlock. deadlock and we chose 'start' as the victim, LOCK_VICTIM_IS_OTHER if a deadlock was found and we chose some other trx as a victim: we must do the search again in this last case because there may be another -deadlock! */ +deadlock! +LOCK_EXCEED_MAX_DEPTH if the lock search exceeds max steps or max depth. */ static ulint lock_deadlock_recursive( /*====================*/ trx_t* start, /*!< in: recursion starting point */ trx_t* trx, /*!< in: a transaction waiting for a lock */ - lock_t* wait_lock, /*!< in: the lock trx is waiting to be granted */ + lock_t* wait_lock, /*!< in: lock that is waiting to be granted */ ulint* cost, /*!< in/out: number of calculation steps thus far: if this exceeds LOCK_MAX_N_STEPS_... - we return LOCK_VICTIM_IS_START */ + we return LOCK_EXCEED_MAX_DEPTH */ ulint depth) /*!< in: recursion depth: if this exceeds LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we - return LOCK_VICTIM_IS_START */ + return LOCK_EXCEED_MAX_DEPTH */ { + ulint ret; lock_t* lock; - ulint bit_no = ULINT_UNDEFINED; trx_t* lock_trx; - ulint ret; + ulint heap_no = ULINT_UNDEFINED; ut_a(trx); ut_a(start); @@ -3357,27 +3378,44 @@ lock_deadlock_recursive( *cost = *cost + 1; - lock = wait_lock; - if (lock_get_type_low(wait_lock) == LOCK_REC) { + ulint space; + ulint page_no; + + heap_no = lock_rec_find_set_bit(wait_lock); + ut_a(heap_no != ULINT_UNDEFINED); + + space = wait_lock->un_member.rec_lock.space; + page_no = wait_lock->un_member.rec_lock.page_no; + + lock = lock_rec_get_first_on_page_addr(space, page_no); + + /* Position the iterator on the first matching record lock. */ + while (lock != NULL + && lock != wait_lock + && !lock_rec_get_nth_bit(lock, heap_no)) { + + lock = lock_rec_get_next_on_page(lock); + } + + if (lock == wait_lock) { + lock = NULL; + } - bit_no = lock_rec_find_set_bit(wait_lock); + ut_ad(lock == NULL || lock_rec_get_nth_bit(lock, heap_no)); - ut_a(bit_no != ULINT_UNDEFINED); + } else { + lock = wait_lock; } /* Look at the locks ahead of wait_lock in the lock queue */ for (;;) { - if (lock_get_type_low(lock) & LOCK_TABLE) { - - lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, - lock); - } else { - ut_ad(lock_get_type_low(lock) == LOCK_REC); - ut_a(bit_no != ULINT_UNDEFINED); + /* Get previous table lock. */ + if (heap_no == ULINT_UNDEFINED) { - lock = (lock_t*) lock_rec_get_prev(lock, bit_no); + lock = UT_LIST_GET_PREV( + un_member.tab_lock.locks, lock); } if (lock == NULL) { @@ -3395,7 +3433,7 @@ lock_deadlock_recursive( lock_trx = lock->trx; - if (lock_trx == start || too_far) { + if (lock_trx == start) { /* We came back to the recursion starting point: a deadlock detected; or we have @@ -3442,19 +3480,10 @@ lock_deadlock_recursive( } #ifdef UNIV_DEBUG if (lock_print_waits) { - fputs("Deadlock detected" - " or too long search\n", + fputs("Deadlock detected\n", stderr); } #endif /* UNIV_DEBUG */ - if (too_far) { - - fputs("TOO DEEP OR LONG SEARCH" - " IN THE LOCK TABLE" - " WAITS-FOR GRAPH\n", ef); - - return(LOCK_VICTIM_IS_START); - } if (trx_weight_cmp(wait_lock->trx, start) >= 0) { @@ -3490,6 +3519,21 @@ lock_deadlock_recursive( return(LOCK_VICTIM_IS_OTHER); } + if (too_far) { + +#ifdef UNIV_DEBUG + if (lock_print_waits) { + fputs("Deadlock search exceeds" + " max steps or depth.\n", + stderr); + } +#endif /* UNIV_DEBUG */ + /* The information about transaction/lock + to be rolled back is available in the top + level. Do not print anything here. */ + return(LOCK_EXCEED_MAX_DEPTH); + } + if (lock_trx->que_state == TRX_QUE_LOCK_WAIT) { /* Another trx ahead has requested lock in an @@ -3499,12 +3543,28 @@ lock_deadlock_recursive( ret = lock_deadlock_recursive( start, lock_trx, lock_trx->wait_lock, cost, depth + 1); + if (ret != 0) { return(ret); } } } + /* Get the next record lock to check. */ + if (heap_no != ULINT_UNDEFINED) { + + ut_a(lock != NULL); + + do { + lock = lock_rec_get_next_on_page(lock); + } while (lock != NULL + && lock != wait_lock + && !lock_rec_get_nth_bit(lock, heap_no)); + + if (lock == wait_lock) { + lock = NULL; + } + } }/* end of the 'for (;;)'-loop */ } @@ -3706,9 +3766,10 @@ lock_table_enqueue_waiting( /*********************************************************************//** Checks if other transactions have an incompatible mode lock request in -the lock queue. */ +the lock queue. +@return lock or NULL */ UNIV_INLINE -ibool +lock_t* lock_table_other_has_incompatible( /*==============================*/ trx_t* trx, /*!< in: transaction, or NULL if all @@ -3730,13 +3791,13 @@ lock_table_other_has_incompatible( && (!lock_mode_compatible(lock_get_mode(lock), mode)) && (wait || !(lock_get_wait(lock)))) { - return(TRUE); + return(lock); } lock = UT_LIST_GET_PREV(un_member.tab_lock.locks, lock); } - return(FALSE); + return(NULL); } /*********************************************************************//** @@ -4260,31 +4321,34 @@ lock_rec_print( putc('\n', file); if ( srv_show_verbose_locks ) { - block = buf_page_try_get(space, page_no, &mtr); + block = buf_page_try_get(space, page_no, &mtr); + + for (i = 0; i < lock_rec_get_n_bits(lock); ++i) { + + if (!lock_rec_get_nth_bit(lock, i)) { + continue; + } + + fprintf(file, "Record lock, heap no %lu", (ulong) i); + if (block) { - for (i = 0; i < lock_rec_get_n_bits(lock); i++) { + const rec_t* rec; - if (lock_rec_get_nth_bit(lock, i)) { + rec = page_find_rec_with_heap_no( + buf_block_get_frame(block), i); - const rec_t* rec - = page_find_rec_with_heap_no( - buf_block_get_frame(block), i); - offsets = rec_get_offsets( - rec, lock->index, offsets, - ULINT_UNDEFINED, &heap); + offsets = rec_get_offsets( + rec, lock->index, offsets, + ULINT_UNDEFINED, &heap); - fprintf(file, "Record lock, heap no %lu ", - (ulong) i); - rec_print_new(file, rec, offsets); - putc('\n', file); - } - } - } else { - for (i = 0; i < lock_rec_get_n_bits(lock); i++) { - fprintf(file, "Record lock, heap no %lu\n", (ulong) i); - } + putc(' ', file); + rec_print_new(file, rec, offsets); } + + putc('\n', file); + } } + mtr_commit(&mtr); if (UNIV_LIKELY_NULL(heap)) { mem_heap_free(heap); @@ -4329,14 +4393,26 @@ lock_get_n_rec_locks(void) #endif /* PRINT_NUM_OF_LOCK_STRUCTS */ /*********************************************************************//** -Prints info of locks for all transactions. */ +Prints info of locks for all transactions. +@return FALSE if not able to obtain kernel mutex +and exits without printing info */ UNIV_INTERN -void +ibool lock_print_info_summary( /*====================*/ - FILE* file) /*!< in: file where to print */ + FILE* file, /*!< in: file where to print */ + ibool nowait) /*!< in: whether to wait for the kernel mutex */ { - lock_mutex_enter_kernel(); + /* if nowait is FALSE, wait on the kernel mutex, + otherwise return immediately if fail to obtain the + mutex. */ + if (!nowait) { + lock_mutex_enter_kernel(); + } else if (mutex_enter_nowait(&kernel_mutex)) { + fputs("FAIL TO OBTAIN KERNEL MUTEX, " + "SKIP LOCK INFO PRINTING\n", file); + return(FALSE); + } if (lock_deadlock_found) { fputs("------------------------\n" @@ -4368,6 +4444,7 @@ lock_print_info_summary( "Total number of lock structs in row lock hash table %lu\n", (ulong) lock_get_n_rec_locks()); #endif /* PRINT_NUM_OF_LOCK_STRUCTS */ + return(TRUE); } /*********************************************************************//** @@ -4648,6 +4725,7 @@ lock_rec_queue_validate( ut_a(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no, impl_trx)); } +#if 0 } else { /* The kernel mutex may get released temporarily in the @@ -4658,6 +4736,27 @@ lock_rec_queue_validate( (fil_space_t::latch), the following check WILL break latching order and may cause a deadlock of threads. */ + /* NOTE: This is a bogus check that would fail in the + following case: Our transaction is updating a + row. After it has updated the clustered index record, + it goes to a secondary index record and finds someone + else holding an explicit S- or X-lock on that + secondary index record, presumably from a locking + read. Our transaction cannot update the secondary + index immediately, but places a waiting X-lock request + on the secondary index record. There is nothing + illegal in this. The assertion is simply too strong. */ + + /* From the locking point of view, each secondary + index is a separate table. A lock that is held on + secondary index rec does not give any rights to modify + or read the clustered index rec. Therefore, we can + think of the sec index as a separate 'table' from the + clust index 'table'. Conversely, a transaction that + has acquired a lock on and modified a clustered index + record may need to wait for a lock on the + corresponding record in a secondary index. */ + impl_trx = lock_sec_rec_some_has_impl_off_kernel( rec, index, offsets); @@ -4668,6 +4767,7 @@ lock_rec_queue_validate( ut_a(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, heap_no, impl_trx)); } +#endif } lock = lock_rec_get_first(block, heap_no); @@ -4765,6 +4865,13 @@ loop: || lock->trx->conc_state == TRX_PREPARED || lock->trx->conc_state == TRX_COMMITTED_IN_MEMORY); +# ifdef UNIV_SYNC_DEBUG + /* Only validate the record queues when this thread is not + holding a space->latch. Deadlocks are possible due to + latching order violation when UNIV_DEBUG is defined while + UNIV_SYNC_DEBUG is not. */ + if (!sync_thread_levels_contains(SYNC_FSP)) +# endif /* UNIV_SYNC_DEBUG */ for (i = nth_bit; i < lock_rec_get_n_bits(lock); i++) { if (i == 1 || lock_rec_get_nth_bit(lock, i)) { @@ -4930,7 +5037,7 @@ lock_rec_insert_check_and_lock( } trx = thr_get_trx(thr); - next_rec = page_rec_get_next((rec_t*) rec); + next_rec = page_rec_get_next_const(rec); next_rec_heap_no = page_rec_get_heap_no(next_rec); lock_mutex_enter_kernel(); |