diff options
author | Marko Mäkelä <marko.makela@oracle.com> | 2010-06-02 13:26:37 +0300 |
---|---|---|
committer | Marko Mäkelä <marko.makela@oracle.com> | 2010-06-02 13:26:37 +0300 |
commit | 306e1338a5c4480d057e7647943aa1eb150e4cef (patch) | |
tree | 4edfd800ae73f8f7afa2798c72833bf6d3281f24 /storage/innobase/lock/lock0lock.c | |
parent | b55ec587016db5af04acd6800eb04c6a427527b6 (diff) | |
download | mariadb-git-306e1338a5c4480d057e7647943aa1eb150e4cef.tar.gz |
Bug#53674: InnoDB: Error: unlock row could not find a 4 mode lock on the record
In semi-consistent read, only unlock freshly locked non-matching records.
Define DB_SUCCESS_LOCKED_REC for indicating a successful operation
where a record lock was created.
lock_rec_lock_fast(): Return LOCK_REC_SUCCESS,
LOCK_REC_SUCCESS_CREATED, or LOCK_REC_FAIL instead of TRUE/FALSE.
lock_sec_rec_read_check_and_lock(),
lock_clust_rec_read_check_and_lock(), lock_rec_enqueue_waiting(),
lock_rec_lock_slow(), lock_rec_lock(), row_ins_set_shared_rec_lock(),
row_ins_set_exclusive_rec_lock(), sel_set_rec_lock(),
row_sel_get_clust_rec_for_mysql(): Return DB_SUCCESS_LOCKED_REC if a
new record lock was created. Adjust callers.
row_unlock_for_mysql(): Correct the function documentation.
row_prebuilt_t::new_rec_locks: Correct the documentation.
Diffstat (limited to 'storage/innobase/lock/lock0lock.c')
-rw-r--r-- | storage/innobase/lock/lock0lock.c | 138 |
1 files changed, 82 insertions, 56 deletions
diff --git a/storage/innobase/lock/lock0lock.c b/storage/innobase/lock/lock0lock.c index 7df8ea50887..04240960b3a 100644 --- a/storage/innobase/lock/lock0lock.c +++ b/storage/innobase/lock/lock0lock.c @@ -1739,11 +1739,12 @@ ulint lock_rec_enqueue_waiting( /*=====================*/ /* out: DB_LOCK_WAIT, DB_DEADLOCK, or - DB_QUE_THR_SUSPENDED, or DB_SUCCESS; - DB_SUCCESS means that there was a deadlock, - but another transaction was chosen as a - victim, and we got the lock immediately: - no need to wait then */ + DB_QUE_THR_SUSPENDED, or DB_SUCCESS_LOCKED_REC; + DB_SUCCESS_LOCKED_REC means that there + was a deadlock, but another + transaction was chosen as a victim, + and we got the lock immediately: no + need to wait then */ ulint type_mode,/* in: lock mode this transaction is requesting: LOCK_S or LOCK_X, possibly ORed with LOCK_GAP or LOCK_REC_NOT_GAP, ORed @@ -1804,7 +1805,7 @@ lock_rec_enqueue_waiting( if (trx->wait_lock == NULL) { - return(DB_SUCCESS); + return(DB_SUCCESS_LOCKED_REC); } trx->que_state = TRX_QUE_LOCK_WAIT; @@ -1903,6 +1904,16 @@ lock_rec_add_to_queue( return(lock_rec_create(type_mode, rec, index, trx)); } +/** Record locking request status */ +enum lock_rec_req_status { + /** Failed to acquire a lock */ + LOCK_REC_FAIL, + /** Succeeded in acquiring a lock (implicit or already acquired) */ + LOCK_REC_SUCCESS, + /** Explicitly created a new lock */ + LOCK_REC_SUCCESS_CREATED +}; + /************************************************************************* This is a fast routine for locking a record in the most common cases: there are no explicit locks on the page, or there is just one lock, owned @@ -1911,10 +1922,10 @@ which does NOT look at implicit locks! Checks lock compatibility within explicit locks. This function sets a normal next-key lock, or in the case of a page supremum record, a gap type lock. */ UNIV_INLINE -ibool +enum lock_rec_req_status lock_rec_lock_fast( /*===============*/ - /* out: TRUE if locking succeeded */ + /* out: whether the locking succeeded */ ibool impl, /* in: if TRUE, no lock is set if no wait is necessary: we assume that the caller will set an implicit lock */ @@ -1950,19 +1961,19 @@ lock_rec_lock_fast( lock_rec_create(mode, rec, index, trx); } - return(TRUE); + return(LOCK_REC_SUCCESS_CREATED); } if (lock_rec_get_next_on_page(lock)) { - return(FALSE); + return(LOCK_REC_FAIL); } if (lock->trx != trx || lock->type_mode != (mode | LOCK_REC) || lock_rec_get_n_bits(lock) <= heap_no) { - return(FALSE); + return(LOCK_REC_FAIL); } if (!impl) { @@ -1971,10 +1982,11 @@ lock_rec_lock_fast( if (!lock_rec_get_nth_bit(lock, heap_no)) { lock_rec_set_nth_bit(lock, heap_no); + return(LOCK_REC_SUCCESS_CREATED); } } - return(TRUE); + return(LOCK_REC_SUCCESS); } /************************************************************************* @@ -1986,8 +1998,9 @@ static ulint lock_rec_lock_slow( /*===============*/ - /* out: DB_SUCCESS, DB_LOCK_WAIT, or error - code */ + /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC, + DB_LOCK_WAIT, DB_DEADLOCK, + or DB_QUE_THR_SUSPENDED */ ibool impl, /* in: if TRUE, no lock is set if no wait is necessary: we assume that the caller will set an implicit lock */ @@ -1998,7 +2011,6 @@ lock_rec_lock_slow( que_thr_t* thr) /* in: query thread */ { trx_t* trx; - ulint err; ut_ad(mutex_own(&kernel_mutex)); ut_ad((LOCK_MODE_MASK & mode) != LOCK_S @@ -2017,26 +2029,21 @@ lock_rec_lock_slow( /* The trx already has a strong enough lock on rec: do nothing */ - err = DB_SUCCESS; } else if (lock_rec_other_has_conflicting(mode, rec, trx)) { /* If another transaction has a non-gap conflicting request in the queue, as this transaction does not have a lock strong enough already granted on the record, we have to wait. */ - err = lock_rec_enqueue_waiting(mode, rec, index, thr); - } else { - if (!impl) { - /* Set the requested lock on the record */ + return(lock_rec_enqueue_waiting(mode, rec, index, thr)); + } else if (!impl) { + /* Set the requested lock on the record */ - lock_rec_add_to_queue(LOCK_REC | mode, rec, index, - trx); - } - - err = DB_SUCCESS; + lock_rec_add_to_queue(LOCK_REC | mode, rec, index, trx); + return(DB_SUCCESS_LOCKED_REC); } - return(err); + return(DB_SUCCESS); } /************************************************************************* @@ -2049,8 +2056,9 @@ static ulint lock_rec_lock( /*==========*/ - /* out: DB_SUCCESS, DB_LOCK_WAIT, or error - code */ + /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC, + DB_LOCK_WAIT, DB_DEADLOCK, or + DB_QUE_THR_SUSPENDED */ ibool impl, /* in: if TRUE, no lock is set if no wait is necessary: we assume that the caller will set an implicit lock */ @@ -2060,8 +2068,6 @@ lock_rec_lock( dict_index_t* index, /* in: index of record */ que_thr_t* thr) /* in: query thread */ { - ulint err; - ut_ad(mutex_own(&kernel_mutex)); ut_ad((LOCK_MODE_MASK & mode) != LOCK_S || lock_table_has(thr_get_trx(thr), index->table, LOCK_IS)); @@ -2073,17 +2079,19 @@ lock_rec_lock( || mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP || mode - (LOCK_MODE_MASK & mode) == 0); - if (lock_rec_lock_fast(impl, mode, rec, index, thr)) { - - /* We try a simplified and faster subroutine for the most - common cases */ - - err = DB_SUCCESS; - } else { - err = lock_rec_lock_slow(impl, mode, rec, index, thr); + /* We try a simplified and faster subroutine for the most + common cases */ + switch (lock_rec_lock_fast(impl, mode, rec, index, thr)) { + case LOCK_REC_SUCCESS: + return(DB_SUCCESS); + case LOCK_REC_SUCCESS_CREATED: + return(DB_SUCCESS_LOCKED_REC); + case LOCK_REC_FAIL: + return(lock_rec_lock_slow(impl, mode, rec, index, thr)); } - return(err); + ut_error; + return(DB_ERROR); } /************************************************************************* @@ -4832,7 +4840,7 @@ lock_rec_insert_check_and_lock( lock = lock_rec_get_first(next_rec); - if (lock == NULL) { + if (UNIV_LIKELY(lock == NULL)) { /* We optimize CPU time usage in the simplest case */ lock_mutex_exit_kernel(); @@ -4840,8 +4848,7 @@ lock_rec_insert_check_and_lock( if (!(index->type & DICT_CLUSTERED)) { /* Update the page max trx id field */ - page_update_max_trx_id(buf_frame_align(rec), - thr_get_trx(thr)->id); + page_update_max_trx_id(buf_frame_align(rec), trx->id); } return(DB_SUCCESS); @@ -4873,11 +4880,16 @@ lock_rec_insert_check_and_lock( lock_mutex_exit_kernel(); - if (!(index->type & DICT_CLUSTERED) && (err == DB_SUCCESS)) { - + switch (err) { + case DB_SUCCESS_LOCKED_REC: + err = DB_SUCCESS; + /* fall through */ + case DB_SUCCESS: + if (index->type & DICT_CLUSTERED) { + break; + } /* Update the page max trx id field */ - page_update_max_trx_id(buf_frame_align(rec), - thr_get_trx(thr)->id); + page_update_max_trx_id(buf_frame_align(rec), trx->id); } #ifdef UNIV_DEBUG @@ -4984,6 +4996,10 @@ lock_clust_rec_modify_check_and_lock( ut_ad(lock_rec_queue_validate(rec, index, offsets)); + if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) { + err = DB_SUCCESS; + } + return(err); } @@ -5043,25 +5059,29 @@ lock_sec_rec_modify_check_and_lock( } #endif /* UNIV_DEBUG */ - if (err == DB_SUCCESS) { + if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) { /* Update the page max trx id field */ - + /* It might not be necessary to do this if + err == DB_SUCCESS (no new lock created), + but it should not cost too much performance. */ page_update_max_trx_id(buf_frame_align(rec), thr_get_trx(thr)->id); + err = DB_SUCCESS; } return(err); } /************************************************************************* -Like the counterpart for a clustered index below, but now we read a +Like lock_clust_rec_read_check_and_lock(), but reads a secondary index record. */ ulint lock_sec_rec_read_check_and_lock( /*=============================*/ - /* out: DB_SUCCESS, DB_LOCK_WAIT, - DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ + /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC, + DB_LOCK_WAIT, DB_DEADLOCK, + or DB_QUE_THR_SUSPENDED */ ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, does nothing */ rec_t* rec, /* in: user record or page supremum record @@ -5126,8 +5146,9 @@ lock on the record. */ ulint lock_clust_rec_read_check_and_lock( /*===============================*/ - /* out: DB_SUCCESS, DB_LOCK_WAIT, - DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ + /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC, + DB_LOCK_WAIT, DB_DEADLOCK, + or DB_QUE_THR_SUSPENDED */ ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, does nothing */ rec_t* rec, /* in: user record or page supremum record @@ -5206,16 +5227,21 @@ lock_clust_rec_read_check_and_lock_alt( mem_heap_t* tmp_heap = NULL; ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint* offsets = offsets_; - ulint ret; + ulint err; *offsets_ = (sizeof offsets_) / sizeof *offsets_; offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &tmp_heap); - ret = lock_clust_rec_read_check_and_lock(flags, rec, index, + err = lock_clust_rec_read_check_and_lock(flags, rec, index, offsets, mode, gap_mode, thr); if (tmp_heap) { mem_heap_free(tmp_heap); } - return(ret); + + if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) { + err = DB_SUCCESS; + } + + return(err); } |