diff options
Diffstat (limited to 'innobase/row/row0ins.c')
-rw-r--r-- | innobase/row/row0ins.c | 234 |
1 files changed, 131 insertions, 103 deletions
diff --git a/innobase/row/row0ins.c b/innobase/row/row0ins.c index 4c5a46536cb..e57622fd1c5 100644 --- a/innobase/row/row0ins.c +++ b/innobase/row/row0ins.c @@ -284,15 +284,15 @@ ibool row_ins_dupl_error_with_rec( /*========================*/ /* out: TRUE if error */ - rec_t* rec, /* in: user record */ + rec_t* rec, /* in: user record; NOTE that we assume + that the caller already has a record lock on + the record! */ dtuple_t* entry, /* in: entry to insert */ - dict_index_t* index, /* in: index */ - trx_t* trx) /* in: inserting transaction */ + dict_index_t* index) /* in: index */ { ulint matched_fields; ulint matched_bytes; ulint n_unique; - trx_t* impl_trx; n_unique = dict_index_get_n_unique(index); @@ -311,46 +311,55 @@ row_ins_dupl_error_with_rec( return(TRUE); } - /* If we get here, the record has its delete mark set. It is still - a unique key violation if the transaction which set the delete mark - is currently active and is not trx itself. We check if some - transaction has an implicit x-lock on the record. */ + return(FALSE); +} - mutex_enter(&kernel_mutex); +/************************************************************************* +Sets a shared lock on a record. Used in locking possible duplicate key +records. */ +static +ulint +row_ins_set_shared_rec_lock( +/*========================*/ + /* out: DB_SUCCESS or error code */ + rec_t* rec, /* in: record */ + dict_index_t* index, /* in: index */ + que_thr_t* thr) /* in: query thread */ +{ + ulint err; if (index->type & DICT_CLUSTERED) { - impl_trx = lock_clust_rec_some_has_impl(rec, index); + err = lock_clust_rec_read_check_and_lock(0, rec, index, LOCK_S, + thr); } else { - impl_trx = lock_sec_rec_some_has_impl_off_kernel(rec, index); + err = lock_sec_rec_read_check_and_lock(0, rec, index, LOCK_S, + thr); } - mutex_exit(&kernel_mutex); - - if (impl_trx && impl_trx != trx) { - - return(TRUE); - } - - return(FALSE); -} + return(err); +} /******************************************************************* Scans a unique non-clustered index at a given index entry to determine -whether a uniqueness violation has occurred for the key value of the entry. */ +whether a uniqueness violation has occurred for the key value of the entry. +Set shared locks on possible duplicate records. */ static ulint row_ins_scan_sec_index_for_duplicate( /*=================================*/ - /* out: DB_SUCCESS or DB_DUPLICATE_KEY */ + /* out: DB_SUCCESS, DB_DUPLICATE_KEY, or + DB_LOCK_WAIT */ dict_index_t* index, /* in: non-clustered unique index */ dtuple_t* entry, /* in: index entry */ - trx_t* trx) /* in: inserting transaction */ + que_thr_t* thr) /* in: query thread */ { - ulint dupl_count = 0; int cmp; ulint n_fields_cmp; rec_t* rec; btr_pcur_t pcur; + trx_t* trx = thr_get_trx(thr); + ulint err = DB_SUCCESS; + ibool moved; mtr_t mtr; mtr_start(&mtr); @@ -361,32 +370,45 @@ row_ins_scan_sec_index_for_duplicate( dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index)); - btr_pcur_open_on_user_rec(index, entry, PAGE_CUR_GE, - BTR_SEARCH_LEAF, &pcur, &mtr); + btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr); - /* Scan index records and check that there are no duplicates */ + /* Scan index records and check if there is a duplicate */ for (;;) { - if (btr_pcur_is_after_last_in_tree(&pcur, &mtr)) { + rec = btr_pcur_get_rec(&pcur); + + if (rec == page_get_infimum_rec(buf_frame_align(rec))) { + + goto next_rec; + } + + /* Try to place a lock on the index record */ + + err = row_ins_set_shared_rec_lock(rec, index, thr); + + if (err != DB_SUCCESS) { break; } - rec = btr_pcur_get_rec(&pcur); + if (rec == page_get_supremum_rec(buf_frame_align(rec))) { + + goto next_rec; + } cmp = cmp_dtuple_rec(entry, rec); if (cmp == 0) { - if (row_ins_dupl_error_with_rec(rec, entry, index, - trx)) { - dupl_count++; - - if (dupl_count > 1) { - /* printf( - "Duplicate key in index %s\n", + if (row_ins_dupl_error_with_rec(rec, entry, index)) { + /* printf("Duplicate key in index %s\n", index->name); - dtuple_print(entry); */ - } + dtuple_print(entry); */ + + err = DB_DUPLICATE_KEY; + + trx->error_info = index; + + break; } } @@ -395,8 +417,12 @@ row_ins_scan_sec_index_for_duplicate( } ut_a(cmp == 0); +next_rec: + moved = btr_pcur_move_to_next(&pcur, &mtr); - btr_pcur_move_to_next_user_rec(&pcur, &mtr); + if (!moved) { + break; + } } mtr_commit(&mtr); @@ -404,44 +430,35 @@ row_ins_scan_sec_index_for_duplicate( /* Restore old value */ dtuple_set_n_fields_cmp(entry, n_fields_cmp); - ut_a(dupl_count >= 1); - - if (dupl_count > 1) { - trx->error_info = index; - - return(DB_DUPLICATE_KEY); - } - - return(DB_SUCCESS); + return(err); } /******************************************************************* -Tries to check if a unique key violation error would occur at an index entry -insert. */ +Checks if a unique key violation error would occur at an index entry +insert. Sets shared locks on possible duplicate records. Works only +for a clustered index! */ static ulint -row_ins_duplicate_error( -/*====================*/ - /* out: DB_SUCCESS if no error - DB_DUPLICATE_KEY if error, - DB_STRONG_FAIL if this is a non-clustered - index record and we cannot determine yet - if there will be an error: in this last - case we must call - row_ins_scan_sec_index_for_duplicate - AFTER the insertion of the record! */ +row_ins_duplicate_error_in_clust( +/*=============================*/ + /* out: DB_SUCCESS if no error, + DB_DUPLICATE_KEY if error, DB_LOCK_WAIT if we + have to wait for a lock on a possible + duplicate record */ btr_cur_t* cursor, /* in: B-tree cursor */ dtuple_t* entry, /* in: entry to insert */ - trx_t* trx, /* in: inserting transaction */ - mtr_t* mtr, /* in: mtr */ - rec_t** dupl_rec)/* out: record with which duplicate error */ + que_thr_t* thr, /* in: query thread */ + mtr_t* mtr) /* in: mtr */ { + ulint err; rec_t* rec; page_t* page; ulint n_unique; + trx_t* trx = thr_get_trx(thr); UT_NOT_USED(mtr); + ut_a(cursor->index->type & DICT_CLUSTERED); ut_ad(cursor->index->type & DICT_UNIQUE); /* NOTE: For unique non-clustered indexes there may be any number @@ -466,9 +483,20 @@ row_ins_duplicate_error( if (rec != page_get_infimum_rec(page)) { + /* We set a lock on the possible duplicate: this + is needed in logical logging of MySQL to make + sure that in roll-forward we get the same duplicate + errors as in original execution */ + + err = row_ins_set_shared_rec_lock(rec, cursor->index, + thr); + if (err != DB_SUCCESS) { + + return(err); + } + if (row_ins_dupl_error_with_rec(rec, entry, - cursor->index, trx)) { - *dupl_rec = rec; + cursor->index)) { trx->error_info = cursor->index; return(DB_DUPLICATE_KEY); @@ -483,9 +511,15 @@ row_ins_duplicate_error( if (rec != page_get_supremum_rec(page)) { + err = row_ins_set_shared_rec_lock(rec, cursor->index, + thr); + if (err != DB_SUCCESS) { + + return(err); + } + if (row_ins_dupl_error_with_rec(rec, entry, - cursor->index, trx)) { - *dupl_rec = rec; + cursor->index)) { trx->error_info = cursor->index; return(DB_DUPLICATE_KEY); @@ -496,15 +530,7 @@ row_ins_duplicate_error( /* This should never happen */ } - if (cursor->index->type & DICT_CLUSTERED) { - - return(DB_SUCCESS); - } - - /* It was a non-clustered index: we must scan the index after the - insertion to be sure if there will be duplicate key error */ - - return(DB_STRONG_FAIL); + return(DB_SUCCESS); } /******************************************************************* @@ -574,18 +600,15 @@ row_ins_index_entry_low( que_thr_t* thr) /* in: query thread */ { btr_cur_t cursor; - ulint dupl = DB_SUCCESS; ulint modify; rec_t* dummy_rec; rec_t* rec; - rec_t* dupl_rec; /* Note that this may be undefined - for a non-clustered index even if - there is a duplicate key */ ulint err; ulint n_unique; mtr_t mtr; log_free_check(); + mtr_start(&mtr); cursor.thr = thr; @@ -611,19 +634,37 @@ row_ins_index_entry_low( if (index->type & DICT_UNIQUE && (cursor.up_match >= n_unique || cursor.low_match >= n_unique)) { - dupl = row_ins_duplicate_error(&cursor, entry, - thr_get_trx(thr), &mtr, &dupl_rec); - if (dupl == DB_DUPLICATE_KEY) { + if (index->type & DICT_CLUSTERED) { + /* Note that the following may return also + DB_LOCK_WAIT */ - /* printf("Duplicate key in index %s lm %lu\n", - cursor->index->name, cursor->low_match); - rec_print(rec); - dtuple_print(entry); */ + err = row_ins_duplicate_error_in_clust(&cursor, + entry, thr, &mtr); + if (err != DB_SUCCESS) { - err = dupl; + goto function_exit; + } + } else { + mtr_commit(&mtr); + err = row_ins_scan_sec_index_for_duplicate(index, + entry, thr); + mtr_start(&mtr); - goto function_exit; - } + if (err != DB_SUCCESS) { + + goto function_exit; + } + + /* We did not find a duplicate and we have now + locked with s-locks the necessary records to + prevent any insertion of a duplicate by another + transaction. Let us now reposition the cursor and + continue the insertion. */ + + btr_cur_search_to_nth_level(index, 0, entry, + PAGE_CUR_LE, mode | BTR_INSERT, + &cursor, 0, &mtr); + } } modify = row_ins_must_modify(&cursor); @@ -659,19 +700,6 @@ row_ins_index_entry_low( function_exit: mtr_commit(&mtr); - if (err == DB_SUCCESS && dupl == DB_STRONG_FAIL) { - /* We were not able to determine before the insertion - whether there will be a duplicate key error: do the check - now */ - - err = row_ins_scan_sec_index_for_duplicate(index, entry, - thr_get_trx(thr)); - } - - ut_ad(err != DB_DUPLICATE_KEY || index->type & DICT_CLUSTERED - || DB_DUPLICATE_KEY == - row_ins_scan_sec_index_for_duplicate(index, entry, - thr_get_trx(thr))); return(err); } |