summaryrefslogtreecommitdiff
path: root/innobase/row/row0ins.c
diff options
context:
space:
mode:
Diffstat (limited to 'innobase/row/row0ins.c')
-rw-r--r--innobase/row/row0ins.c234
1 files changed, 131 insertions, 103 deletions
diff --git a/innobase/row/row0ins.c b/innobase/row/row0ins.c
index 4c5a46536cb..e57622fd1c5 100644
--- a/innobase/row/row0ins.c
+++ b/innobase/row/row0ins.c
@@ -284,15 +284,15 @@ ibool
row_ins_dupl_error_with_rec(
/*========================*/
/* out: TRUE if error */
- rec_t* rec, /* in: user record */
+ rec_t* rec, /* in: user record; NOTE that we assume
+ that the caller already has a record lock on
+ the record! */
dtuple_t* entry, /* in: entry to insert */
- dict_index_t* index, /* in: index */
- trx_t* trx) /* in: inserting transaction */
+ dict_index_t* index) /* in: index */
{
ulint matched_fields;
ulint matched_bytes;
ulint n_unique;
- trx_t* impl_trx;
n_unique = dict_index_get_n_unique(index);
@@ -311,46 +311,55 @@ row_ins_dupl_error_with_rec(
return(TRUE);
}
- /* If we get here, the record has its delete mark set. It is still
- a unique key violation if the transaction which set the delete mark
- is currently active and is not trx itself. We check if some
- transaction has an implicit x-lock on the record. */
+ return(FALSE);
+}
- mutex_enter(&kernel_mutex);
+/*************************************************************************
+Sets a shared lock on a record. Used in locking possible duplicate key
+records. */
+static
+ulint
+row_ins_set_shared_rec_lock(
+/*========================*/
+ /* out: DB_SUCCESS or error code */
+ rec_t* rec, /* in: record */
+ dict_index_t* index, /* in: index */
+ que_thr_t* thr) /* in: query thread */
+{
+ ulint err;
if (index->type & DICT_CLUSTERED) {
- impl_trx = lock_clust_rec_some_has_impl(rec, index);
+ err = lock_clust_rec_read_check_and_lock(0, rec, index, LOCK_S,
+ thr);
} else {
- impl_trx = lock_sec_rec_some_has_impl_off_kernel(rec, index);
+ err = lock_sec_rec_read_check_and_lock(0, rec, index, LOCK_S,
+ thr);
}
- mutex_exit(&kernel_mutex);
-
- if (impl_trx && impl_trx != trx) {
-
- return(TRUE);
- }
-
- return(FALSE);
-}
+ return(err);
+}
/*******************************************************************
Scans a unique non-clustered index at a given index entry to determine
-whether a uniqueness violation has occurred for the key value of the entry. */
+whether a uniqueness violation has occurred for the key value of the entry.
+Set shared locks on possible duplicate records. */
static
ulint
row_ins_scan_sec_index_for_duplicate(
/*=================================*/
- /* out: DB_SUCCESS or DB_DUPLICATE_KEY */
+ /* out: DB_SUCCESS, DB_DUPLICATE_KEY, or
+ DB_LOCK_WAIT */
dict_index_t* index, /* in: non-clustered unique index */
dtuple_t* entry, /* in: index entry */
- trx_t* trx) /* in: inserting transaction */
+ que_thr_t* thr) /* in: query thread */
{
- ulint dupl_count = 0;
int cmp;
ulint n_fields_cmp;
rec_t* rec;
btr_pcur_t pcur;
+ trx_t* trx = thr_get_trx(thr);
+ ulint err = DB_SUCCESS;
+ ibool moved;
mtr_t mtr;
mtr_start(&mtr);
@@ -361,32 +370,45 @@ row_ins_scan_sec_index_for_duplicate(
dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index));
- btr_pcur_open_on_user_rec(index, entry, PAGE_CUR_GE,
- BTR_SEARCH_LEAF, &pcur, &mtr);
+ btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
- /* Scan index records and check that there are no duplicates */
+ /* Scan index records and check if there is a duplicate */
for (;;) {
- if (btr_pcur_is_after_last_in_tree(&pcur, &mtr)) {
+ rec = btr_pcur_get_rec(&pcur);
+
+ if (rec == page_get_infimum_rec(buf_frame_align(rec))) {
+
+ goto next_rec;
+ }
+
+ /* Try to place a lock on the index record */
+
+ err = row_ins_set_shared_rec_lock(rec, index, thr);
+
+ if (err != DB_SUCCESS) {
break;
}
- rec = btr_pcur_get_rec(&pcur);
+ if (rec == page_get_supremum_rec(buf_frame_align(rec))) {
+
+ goto next_rec;
+ }
cmp = cmp_dtuple_rec(entry, rec);
if (cmp == 0) {
- if (row_ins_dupl_error_with_rec(rec, entry, index,
- trx)) {
- dupl_count++;
-
- if (dupl_count > 1) {
- /* printf(
- "Duplicate key in index %s\n",
+ if (row_ins_dupl_error_with_rec(rec, entry, index)) {
+ /* printf("Duplicate key in index %s\n",
index->name);
- dtuple_print(entry); */
- }
+ dtuple_print(entry); */
+
+ err = DB_DUPLICATE_KEY;
+
+ trx->error_info = index;
+
+ break;
}
}
@@ -395,8 +417,12 @@ row_ins_scan_sec_index_for_duplicate(
}
ut_a(cmp == 0);
+next_rec:
+ moved = btr_pcur_move_to_next(&pcur, &mtr);
- btr_pcur_move_to_next_user_rec(&pcur, &mtr);
+ if (!moved) {
+ break;
+ }
}
mtr_commit(&mtr);
@@ -404,44 +430,35 @@ row_ins_scan_sec_index_for_duplicate(
/* Restore old value */
dtuple_set_n_fields_cmp(entry, n_fields_cmp);
- ut_a(dupl_count >= 1);
-
- if (dupl_count > 1) {
- trx->error_info = index;
-
- return(DB_DUPLICATE_KEY);
- }
-
- return(DB_SUCCESS);
+ return(err);
}
/*******************************************************************
-Tries to check if a unique key violation error would occur at an index entry
-insert. */
+Checks if a unique key violation error would occur at an index entry
+insert. Sets shared locks on possible duplicate records. Works only
+for a clustered index! */
static
ulint
-row_ins_duplicate_error(
-/*====================*/
- /* out: DB_SUCCESS if no error
- DB_DUPLICATE_KEY if error,
- DB_STRONG_FAIL if this is a non-clustered
- index record and we cannot determine yet
- if there will be an error: in this last
- case we must call
- row_ins_scan_sec_index_for_duplicate
- AFTER the insertion of the record! */
+row_ins_duplicate_error_in_clust(
+/*=============================*/
+ /* out: DB_SUCCESS if no error,
+ DB_DUPLICATE_KEY if error, DB_LOCK_WAIT if we
+ have to wait for a lock on a possible
+ duplicate record */
btr_cur_t* cursor, /* in: B-tree cursor */
dtuple_t* entry, /* in: entry to insert */
- trx_t* trx, /* in: inserting transaction */
- mtr_t* mtr, /* in: mtr */
- rec_t** dupl_rec)/* out: record with which duplicate error */
+ que_thr_t* thr, /* in: query thread */
+ mtr_t* mtr) /* in: mtr */
{
+ ulint err;
rec_t* rec;
page_t* page;
ulint n_unique;
+ trx_t* trx = thr_get_trx(thr);
UT_NOT_USED(mtr);
+ ut_a(cursor->index->type & DICT_CLUSTERED);
ut_ad(cursor->index->type & DICT_UNIQUE);
/* NOTE: For unique non-clustered indexes there may be any number
@@ -466,9 +483,20 @@ row_ins_duplicate_error(
if (rec != page_get_infimum_rec(page)) {
+ /* We set a lock on the possible duplicate: this
+ is needed in logical logging of MySQL to make
+ sure that in roll-forward we get the same duplicate
+ errors as in original execution */
+
+ err = row_ins_set_shared_rec_lock(rec, cursor->index,
+ thr);
+ if (err != DB_SUCCESS) {
+
+ return(err);
+ }
+
if (row_ins_dupl_error_with_rec(rec, entry,
- cursor->index, trx)) {
- *dupl_rec = rec;
+ cursor->index)) {
trx->error_info = cursor->index;
return(DB_DUPLICATE_KEY);
@@ -483,9 +511,15 @@ row_ins_duplicate_error(
if (rec != page_get_supremum_rec(page)) {
+ err = row_ins_set_shared_rec_lock(rec, cursor->index,
+ thr);
+ if (err != DB_SUCCESS) {
+
+ return(err);
+ }
+
if (row_ins_dupl_error_with_rec(rec, entry,
- cursor->index, trx)) {
- *dupl_rec = rec;
+ cursor->index)) {
trx->error_info = cursor->index;
return(DB_DUPLICATE_KEY);
@@ -496,15 +530,7 @@ row_ins_duplicate_error(
/* This should never happen */
}
- if (cursor->index->type & DICT_CLUSTERED) {
-
- return(DB_SUCCESS);
- }
-
- /* It was a non-clustered index: we must scan the index after the
- insertion to be sure if there will be duplicate key error */
-
- return(DB_STRONG_FAIL);
+ return(DB_SUCCESS);
}
/*******************************************************************
@@ -574,18 +600,15 @@ row_ins_index_entry_low(
que_thr_t* thr) /* in: query thread */
{
btr_cur_t cursor;
- ulint dupl = DB_SUCCESS;
ulint modify;
rec_t* dummy_rec;
rec_t* rec;
- rec_t* dupl_rec; /* Note that this may be undefined
- for a non-clustered index even if
- there is a duplicate key */
ulint err;
ulint n_unique;
mtr_t mtr;
log_free_check();
+
mtr_start(&mtr);
cursor.thr = thr;
@@ -611,19 +634,37 @@ row_ins_index_entry_low(
if (index->type & DICT_UNIQUE && (cursor.up_match >= n_unique
|| cursor.low_match >= n_unique)) {
- dupl = row_ins_duplicate_error(&cursor, entry,
- thr_get_trx(thr), &mtr, &dupl_rec);
- if (dupl == DB_DUPLICATE_KEY) {
+ if (index->type & DICT_CLUSTERED) {
+ /* Note that the following may return also
+ DB_LOCK_WAIT */
- /* printf("Duplicate key in index %s lm %lu\n",
- cursor->index->name, cursor->low_match);
- rec_print(rec);
- dtuple_print(entry); */
+ err = row_ins_duplicate_error_in_clust(&cursor,
+ entry, thr, &mtr);
+ if (err != DB_SUCCESS) {
- err = dupl;
+ goto function_exit;
+ }
+ } else {
+ mtr_commit(&mtr);
+ err = row_ins_scan_sec_index_for_duplicate(index,
+ entry, thr);
+ mtr_start(&mtr);
- goto function_exit;
- }
+ if (err != DB_SUCCESS) {
+
+ goto function_exit;
+ }
+
+ /* We did not find a duplicate and we have now
+ locked with s-locks the necessary records to
+ prevent any insertion of a duplicate by another
+ transaction. Let us now reposition the cursor and
+ continue the insertion. */
+
+ btr_cur_search_to_nth_level(index, 0, entry,
+ PAGE_CUR_LE, mode | BTR_INSERT,
+ &cursor, 0, &mtr);
+ }
}
modify = row_ins_must_modify(&cursor);
@@ -659,19 +700,6 @@ row_ins_index_entry_low(
function_exit:
mtr_commit(&mtr);
- if (err == DB_SUCCESS && dupl == DB_STRONG_FAIL) {
- /* We were not able to determine before the insertion
- whether there will be a duplicate key error: do the check
- now */
-
- err = row_ins_scan_sec_index_for_duplicate(index, entry,
- thr_get_trx(thr));
- }
-
- ut_ad(err != DB_DUPLICATE_KEY || index->type & DICT_CLUSTERED
- || DB_DUPLICATE_KEY ==
- row_ins_scan_sec_index_for_duplicate(index, entry,
- thr_get_trx(thr)));
return(err);
}