summaryrefslogtreecommitdiff
path: root/storage/innobase/lock/lock0lock.c
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@oracle.com>2010-06-02 13:26:37 +0300
committerMarko Mäkelä <marko.makela@oracle.com>2010-06-02 13:26:37 +0300
commit306e1338a5c4480d057e7647943aa1eb150e4cef (patch)
tree4edfd800ae73f8f7afa2798c72833bf6d3281f24 /storage/innobase/lock/lock0lock.c
parentb55ec587016db5af04acd6800eb04c6a427527b6 (diff)
downloadmariadb-git-306e1338a5c4480d057e7647943aa1eb150e4cef.tar.gz
Bug#53674: InnoDB: Error: unlock row could not find a 4 mode lock on the record
In semi-consistent read, only unlock freshly locked non-matching records. Define DB_SUCCESS_LOCKED_REC for indicating a successful operation where a record lock was created. lock_rec_lock_fast(): Return LOCK_REC_SUCCESS, LOCK_REC_SUCCESS_CREATED, or LOCK_REC_FAIL instead of TRUE/FALSE. lock_sec_rec_read_check_and_lock(), lock_clust_rec_read_check_and_lock(), lock_rec_enqueue_waiting(), lock_rec_lock_slow(), lock_rec_lock(), row_ins_set_shared_rec_lock(), row_ins_set_exclusive_rec_lock(), sel_set_rec_lock(), row_sel_get_clust_rec_for_mysql(): Return DB_SUCCESS_LOCKED_REC if a new record lock was created. Adjust callers. row_unlock_for_mysql(): Correct the function documentation. row_prebuilt_t::new_rec_locks: Correct the documentation.
Diffstat (limited to 'storage/innobase/lock/lock0lock.c')
-rw-r--r--storage/innobase/lock/lock0lock.c138
1 files changed, 82 insertions, 56 deletions
diff --git a/storage/innobase/lock/lock0lock.c b/storage/innobase/lock/lock0lock.c
index 7df8ea50887..04240960b3a 100644
--- a/storage/innobase/lock/lock0lock.c
+++ b/storage/innobase/lock/lock0lock.c
@@ -1739,11 +1739,12 @@ ulint
lock_rec_enqueue_waiting(
/*=====================*/
/* out: DB_LOCK_WAIT, DB_DEADLOCK, or
- DB_QUE_THR_SUSPENDED, or DB_SUCCESS;
- DB_SUCCESS means that there was a deadlock,
- but another transaction was chosen as a
- victim, and we got the lock immediately:
- no need to wait then */
+ DB_QUE_THR_SUSPENDED, or DB_SUCCESS_LOCKED_REC;
+ DB_SUCCESS_LOCKED_REC means that there
+ was a deadlock, but another
+ transaction was chosen as a victim,
+ and we got the lock immediately: no
+ need to wait then */
ulint type_mode,/* in: lock mode this transaction is
requesting: LOCK_S or LOCK_X, possibly ORed
with LOCK_GAP or LOCK_REC_NOT_GAP, ORed
@@ -1804,7 +1805,7 @@ lock_rec_enqueue_waiting(
if (trx->wait_lock == NULL) {
- return(DB_SUCCESS);
+ return(DB_SUCCESS_LOCKED_REC);
}
trx->que_state = TRX_QUE_LOCK_WAIT;
@@ -1903,6 +1904,16 @@ lock_rec_add_to_queue(
return(lock_rec_create(type_mode, rec, index, trx));
}
+/** Record locking request status */
+enum lock_rec_req_status {
+ /** Failed to acquire a lock */
+ LOCK_REC_FAIL,
+ /** Succeeded in acquiring a lock (implicit or already acquired) */
+ LOCK_REC_SUCCESS,
+ /** Explicitly created a new lock */
+ LOCK_REC_SUCCESS_CREATED
+};
+
/*************************************************************************
This is a fast routine for locking a record in the most common cases:
there are no explicit locks on the page, or there is just one lock, owned
@@ -1911,10 +1922,10 @@ which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case of
a page supremum record, a gap type lock. */
UNIV_INLINE
-ibool
+enum lock_rec_req_status
lock_rec_lock_fast(
/*===============*/
- /* out: TRUE if locking succeeded */
+ /* out: whether the locking succeeded */
ibool impl, /* in: if TRUE, no lock is set if no wait
is necessary: we assume that the caller will
set an implicit lock */
@@ -1950,19 +1961,19 @@ lock_rec_lock_fast(
lock_rec_create(mode, rec, index, trx);
}
- return(TRUE);
+ return(LOCK_REC_SUCCESS_CREATED);
}
if (lock_rec_get_next_on_page(lock)) {
- return(FALSE);
+ return(LOCK_REC_FAIL);
}
if (lock->trx != trx
|| lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) {
- return(FALSE);
+ return(LOCK_REC_FAIL);
}
if (!impl) {
@@ -1971,10 +1982,11 @@ lock_rec_lock_fast(
if (!lock_rec_get_nth_bit(lock, heap_no)) {
lock_rec_set_nth_bit(lock, heap_no);
+ return(LOCK_REC_SUCCESS_CREATED);
}
}
- return(TRUE);
+ return(LOCK_REC_SUCCESS);
}
/*************************************************************************
@@ -1986,8 +1998,9 @@ static
ulint
lock_rec_lock_slow(
/*===============*/
- /* out: DB_SUCCESS, DB_LOCK_WAIT, or error
- code */
+ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+ DB_LOCK_WAIT, DB_DEADLOCK,
+ or DB_QUE_THR_SUSPENDED */
ibool impl, /* in: if TRUE, no lock is set if no wait is
necessary: we assume that the caller will set
an implicit lock */
@@ -1998,7 +2011,6 @@ lock_rec_lock_slow(
que_thr_t* thr) /* in: query thread */
{
trx_t* trx;
- ulint err;
ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
@@ -2017,26 +2029,21 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do
nothing */
- err = DB_SUCCESS;
} else if (lock_rec_other_has_conflicting(mode, rec, trx)) {
/* If another transaction has a non-gap conflicting request in
the queue, as this transaction does not have a lock strong
enough already granted on the record, we have to wait. */
- err = lock_rec_enqueue_waiting(mode, rec, index, thr);
- } else {
- if (!impl) {
- /* Set the requested lock on the record */
+ return(lock_rec_enqueue_waiting(mode, rec, index, thr));
+ } else if (!impl) {
+ /* Set the requested lock on the record */
- lock_rec_add_to_queue(LOCK_REC | mode, rec, index,
- trx);
- }
-
- err = DB_SUCCESS;
+ lock_rec_add_to_queue(LOCK_REC | mode, rec, index, trx);
+ return(DB_SUCCESS_LOCKED_REC);
}
- return(err);
+ return(DB_SUCCESS);
}
/*************************************************************************
@@ -2049,8 +2056,9 @@ static
ulint
lock_rec_lock(
/*==========*/
- /* out: DB_SUCCESS, DB_LOCK_WAIT, or error
- code */
+ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+ DB_LOCK_WAIT, DB_DEADLOCK, or
+ DB_QUE_THR_SUSPENDED */
ibool impl, /* in: if TRUE, no lock is set if no wait is
necessary: we assume that the caller will set
an implicit lock */
@@ -2060,8 +2068,6 @@ lock_rec_lock(
dict_index_t* index, /* in: index of record */
que_thr_t* thr) /* in: query thread */
{
- ulint err;
-
ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
@@ -2073,17 +2079,19 @@ lock_rec_lock(
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0);
- if (lock_rec_lock_fast(impl, mode, rec, index, thr)) {
-
- /* We try a simplified and faster subroutine for the most
- common cases */
-
- err = DB_SUCCESS;
- } else {
- err = lock_rec_lock_slow(impl, mode, rec, index, thr);
+ /* We try a simplified and faster subroutine for the most
+ common cases */
+ switch (lock_rec_lock_fast(impl, mode, rec, index, thr)) {
+ case LOCK_REC_SUCCESS:
+ return(DB_SUCCESS);
+ case LOCK_REC_SUCCESS_CREATED:
+ return(DB_SUCCESS_LOCKED_REC);
+ case LOCK_REC_FAIL:
+ return(lock_rec_lock_slow(impl, mode, rec, index, thr));
}
- return(err);
+ ut_error;
+ return(DB_ERROR);
}
/*************************************************************************
@@ -4832,7 +4840,7 @@ lock_rec_insert_check_and_lock(
lock = lock_rec_get_first(next_rec);
- if (lock == NULL) {
+ if (UNIV_LIKELY(lock == NULL)) {
/* We optimize CPU time usage in the simplest case */
lock_mutex_exit_kernel();
@@ -4840,8 +4848,7 @@ lock_rec_insert_check_and_lock(
if (!(index->type & DICT_CLUSTERED)) {
/* Update the page max trx id field */
- page_update_max_trx_id(buf_frame_align(rec),
- thr_get_trx(thr)->id);
+ page_update_max_trx_id(buf_frame_align(rec), trx->id);
}
return(DB_SUCCESS);
@@ -4873,11 +4880,16 @@ lock_rec_insert_check_and_lock(
lock_mutex_exit_kernel();
- if (!(index->type & DICT_CLUSTERED) && (err == DB_SUCCESS)) {
-
+ switch (err) {
+ case DB_SUCCESS_LOCKED_REC:
+ err = DB_SUCCESS;
+ /* fall through */
+ case DB_SUCCESS:
+ if (index->type & DICT_CLUSTERED) {
+ break;
+ }
/* Update the page max trx id field */
- page_update_max_trx_id(buf_frame_align(rec),
- thr_get_trx(thr)->id);
+ page_update_max_trx_id(buf_frame_align(rec), trx->id);
}
#ifdef UNIV_DEBUG
@@ -4984,6 +4996,10 @@ lock_clust_rec_modify_check_and_lock(
ut_ad(lock_rec_queue_validate(rec, index, offsets));
+ if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
+ err = DB_SUCCESS;
+ }
+
return(err);
}
@@ -5043,25 +5059,29 @@ lock_sec_rec_modify_check_and_lock(
}
#endif /* UNIV_DEBUG */
- if (err == DB_SUCCESS) {
+ if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
/* Update the page max trx id field */
-
+ /* It might not be necessary to do this if
+ err == DB_SUCCESS (no new lock created),
+ but it should not cost too much performance. */
page_update_max_trx_id(buf_frame_align(rec),
thr_get_trx(thr)->id);
+ err = DB_SUCCESS;
}
return(err);
}
/*************************************************************************
-Like the counterpart for a clustered index below, but now we read a
+Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record. */
ulint
lock_sec_rec_read_check_and_lock(
/*=============================*/
- /* out: DB_SUCCESS, DB_LOCK_WAIT,
- DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+ DB_LOCK_WAIT, DB_DEADLOCK,
+ or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */
rec_t* rec, /* in: user record or page supremum record
@@ -5126,8 +5146,9 @@ lock on the record. */
ulint
lock_clust_rec_read_check_and_lock(
/*===============================*/
- /* out: DB_SUCCESS, DB_LOCK_WAIT,
- DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
+ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
+ DB_LOCK_WAIT, DB_DEADLOCK,
+ or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */
rec_t* rec, /* in: user record or page supremum record
@@ -5206,16 +5227,21 @@ lock_clust_rec_read_check_and_lock_alt(
mem_heap_t* tmp_heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
- ulint ret;
+ ulint err;
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &tmp_heap);
- ret = lock_clust_rec_read_check_and_lock(flags, rec, index,
+ err = lock_clust_rec_read_check_and_lock(flags, rec, index,
offsets, mode, gap_mode, thr);
if (tmp_heap) {
mem_heap_free(tmp_heap);
}
- return(ret);
+
+ if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
+ err = DB_SUCCESS;
+ }
+
+ return(err);
}