diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2023-01-03 16:10:02 +0200 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2023-01-03 16:10:02 +0200 |
commit | fb0808c450849e00993fa38839f33969a9daf7e8 (patch) | |
tree | 55aca4c3c7dff7e09a13efa6dcfb58d5e91def35 /storage/innobase | |
parent | fb41117c907a99d051ac09c229762978373d7eb0 (diff) | |
parent | 8760f6907c51e0e20242a53188be5b62029d6f1a (diff) | |
download | mariadb-git-fb0808c450849e00993fa38839f33969a9daf7e8.tar.gz |
Merge 10.3 into 10.4
Diffstat (limited to 'storage/innobase')
-rw-r--r-- | storage/innobase/dict/dict0mem.cc | 14 | ||||
-rw-r--r-- | storage/innobase/fts/fts0fts.cc | 134 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 48 | ||||
-rw-r--r-- | storage/innobase/handler/handler0alter.cc | 35 | ||||
-rw-r--r-- | storage/innobase/include/dict0mem.h | 3 | ||||
-rw-r--r-- | storage/innobase/include/fts0fts.h | 17 | ||||
-rw-r--r-- | storage/innobase/include/row0ins.h | 1 | ||||
-rw-r--r-- | storage/innobase/include/row0upd.h | 16 | ||||
-rw-r--r-- | storage/innobase/row/row0ins.cc | 102 | ||||
-rw-r--r-- | storage/innobase/row/row0merge.cc | 33 | ||||
-rw-r--r-- | storage/innobase/row/row0mysql.cc | 11 | ||||
-rw-r--r-- | storage/innobase/row/row0row.cc | 4 | ||||
-rw-r--r-- | storage/innobase/row/row0sel.cc | 4 | ||||
-rw-r--r-- | storage/innobase/row/row0upd.cc | 13 |
14 files changed, 325 insertions, 110 deletions
diff --git a/storage/innobase/dict/dict0mem.cc b/storage/innobase/dict/dict0mem.cc index 0bdb086a5b6..d6b9e296c5c 100644 --- a/storage/innobase/dict/dict0mem.cc +++ b/storage/innobase/dict/dict0mem.cc @@ -1370,6 +1370,20 @@ dict_index_t::vers_history_row( { ut_ad(!is_primary()); + /* + Get row_end from clustered index + + TODO (optimization): row_end can be taken from unique secondary index + as well. For that dict_index_t::vers_end member should be added and + updated at index init (dict_index_build_internal_non_clust()). + + Test case: + + create or replace table t1 (x int unique, y int unique, + foreign key r (y) references t1 (x)) + with system versioning engine innodb; + insert into t1 values (1, 1); + */ bool error = false; mem_heap_t* heap = NULL; dict_index_t* clust_index = NULL; diff --git a/storage/innobase/fts/fts0fts.cc b/storage/innobase/fts/fts0fts.cc index 64d662ca9b6..224fc9593b7 100644 --- a/storage/innobase/fts/fts0fts.cc +++ b/storage/innobase/fts/fts0fts.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 2011, 2021, Oracle and/or its affiliates. -Copyright (c) 2016, 2022, MariaDB Corporation. +Copyright (c) 2016, 2023, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -386,8 +386,10 @@ fts_read_stopword( fts_string_t str; mem_heap_t* heap; ib_rbt_bound_t parent; + dict_table_t* table; sel_node = static_cast<sel_node_t*>(row); + table = sel_node->table_list->table; stopword_info = static_cast<fts_stopword_t*>(user_arg); stop_words = stopword_info->cached_stopword; @@ -402,6 +404,27 @@ fts_read_stopword( str.f_n_char = 0; str.f_str = static_cast<byte*>(dfield_get_data(dfield)); str.f_len = dfield_get_len(dfield); + exp = que_node_get_next(exp); + ut_ad(exp); + + if (table->versioned()) { + dfield = que_node_get_val(exp); + ut_ad(dfield_get_type(dfield)->vers_sys_end()); + void* data = dfield_get_data(dfield); + ulint len = dfield_get_len(dfield); + if (table->versioned_by_id()) { + ut_ad(len == sizeof trx_id_max_bytes); + if (0 != memcmp(data, trx_id_max_bytes, len)) { + return true; + } + } else { + ut_ad(len == sizeof timestamp_max_bytes); + if (0 != memcmp(data, timestamp_max_bytes, len)) { + return true; + } + } + } + ut_ad(!que_node_get_next(exp)); /* Only create new node if it is a value not already existed */ if (str.f_len != UNIV_SQL_NULL @@ -445,7 +468,9 @@ fts_load_user_stopword( /* Validate the user table existence in the right format */ bool ret= false; - stopword_info->charset = fts_valid_stopword_table(stopword_table_name); + const char* row_end; + stopword_info->charset = fts_valid_stopword_table(stopword_table_name, + &row_end); if (!stopword_info->charset) { cleanup: if (!fts->dict_locked) { @@ -470,6 +495,7 @@ cleanup: pars_info_t* info = pars_info_create(); pars_info_bind_id(info, "table_stopword", stopword_table_name); + pars_info_bind_id(info, "row_end", row_end); pars_info_bind_function(info, "my_func", fts_read_stopword, stopword_info); @@ -478,7 +504,7 @@ cleanup: info, "DECLARE FUNCTION my_func;\n" "DECLARE CURSOR c IS" - " SELECT value" + " SELECT value, $row_end" " FROM $table_stopword;\n" "BEGIN\n" "\n" @@ -1913,9 +1939,16 @@ fts_create_common_tables( goto func_exit; } - index = dict_mem_index_create(table, FTS_DOC_ID_INDEX_NAME, - DICT_UNIQUE, 1); - dict_mem_index_add_field(index, FTS_DOC_ID_COL_NAME, 0); + if (table->versioned()) { + index = dict_mem_index_create(table, FTS_DOC_ID_INDEX_NAME, + DICT_UNIQUE, 2); + dict_mem_index_add_field(index, FTS_DOC_ID_COL_NAME, 0); + dict_mem_index_add_field(index, table->cols[table->vers_end].name(*table), 0); + } else { + index = dict_mem_index_create(table, FTS_DOC_ID_INDEX_NAME, + DICT_UNIQUE, 1); + dict_mem_index_add_field(index, FTS_DOC_ID_COL_NAME, 0); + } op = trx_get_dict_operation(trx); @@ -3405,7 +3438,8 @@ fts_add_doc_by_id( /* Search based on Doc ID. Here, we'll need to consider the case when there is no primary index on Doc ID */ - tuple = dtuple_create(heap, 1); + const ulint n_uniq = table->fts_n_uniq(); + tuple = dtuple_create(heap, n_uniq); dfield = dtuple_get_nth_field(tuple, 0); dfield->type.mtype = DATA_INT; dfield->type.prtype = DATA_NOT_NULL | DATA_UNSIGNED | DATA_BINARY_TYPE; @@ -3413,12 +3447,27 @@ fts_add_doc_by_id( mach_write_to_8((byte*) &temp_doc_id, doc_id); dfield_set_data(dfield, &temp_doc_id, sizeof(temp_doc_id)); + if (n_uniq == 2) { + ut_ad(table->versioned()); + ut_ad(fts_id_index->fields[1].col->vers_sys_end()); + dfield = dtuple_get_nth_field(tuple, 1); + dfield->type.mtype = fts_id_index->fields[1].col->mtype; + dfield->type.prtype = fts_id_index->fields[1].col->prtype; + if (table->versioned_by_id()) { + dfield_set_data(dfield, trx_id_max_bytes, + sizeof(trx_id_max_bytes)); + } else { + dfield_set_data(dfield, timestamp_max_bytes, + sizeof(timestamp_max_bytes)); + } + } + btr_pcur_open_with_no_init( fts_id_index, tuple, PAGE_CUR_LE, BTR_SEARCH_LEAF, &pcur, &mtr); /* If we have a match, add the data to doc structure */ - if (btr_pcur_get_low_match(&pcur) == 1) { + if (btr_pcur_get_low_match(&pcur) == n_uniq) { const rec_t* rec; btr_pcur_t* doc_pcur; const rec_t* clust_rec; @@ -3614,13 +3663,34 @@ fts_get_max_doc_id( if (!page_is_empty(btr_pcur_get_page(&pcur))) { const rec_t* rec = NULL; + constexpr ulint doc_id_len= 8; do { rec = btr_pcur_get_rec(&pcur); - if (page_rec_is_user_rec(rec)) { + if (!page_rec_is_user_rec(rec)) { + continue; + } + + if (index->n_uniq == 1) { break; } + + ut_ad(table->versioned()); + ut_ad(index->n_uniq == 2); + + const byte *data = rec + doc_id_len; + if (table->versioned_by_id()) { + if (0 == memcmp(data, trx_id_max_bytes, + sizeof trx_id_max_bytes)) { + break; + } + } else { + if (0 == memcmp(data, timestamp_max_bytes, + sizeof timestamp_max_bytes)) { + break; + } + } } while (btr_pcur_move_to_prev(&pcur, &mtr)); if (!rec || rec_is_metadata(rec, *index)) { @@ -5902,12 +5972,16 @@ void fts_drop_orphaned_tables() /**********************************************************************//** Check whether user supplied stopword table is of the right format. Caller is responsible to hold dictionary locks. -@return the stopword column charset if qualifies */ +@param stopword_table_name table name +@param row_end name of the system-versioning end column, or "value" +@return the stopword column charset +@retval NULL if the table does not exist or qualify */ CHARSET_INFO* fts_valid_stopword_table( /*=====================*/ - const char* stopword_table_name) /*!< in: Stopword table + const char* stopword_table_name, /*!< in: Stopword table name */ + const char** row_end) /* row_end value of system-versioned table */ { dict_table_t* table; dict_col_t* col = NULL; @@ -5949,6 +6023,13 @@ fts_valid_stopword_table( } ut_ad(col); + ut_ad(!table->versioned() || col->ind != table->vers_end); + + if (row_end) { + *row_end = table->versioned() + ? dict_table_get_col_name(table, table->vers_end) + : "value"; /* for fts_load_user_stopword() */ + } return(fts_get_charset(col->prtype)); } @@ -6084,18 +6165,20 @@ cleanup: /**********************************************************************//** Callback function when we initialize the FTS at the start up time. It recovers the maximum Doc IDs presented in the current table. +Tested by innodb_fts.crash_recovery @return: always returns TRUE */ static ibool fts_init_get_doc_id( /*================*/ void* row, /*!< in: sel_node_t* */ - void* user_arg) /*!< in: fts cache */ + void* user_arg) /*!< in: table with fts */ { doc_id_t doc_id = FTS_NULL_DOC_ID; sel_node_t* node = static_cast<sel_node_t*>(row); que_node_t* exp = node->select_list; - fts_cache_t* cache = static_cast<fts_cache_t*>(user_arg); + dict_table_t* table = static_cast<dict_table_t *>(user_arg); + fts_cache_t* cache = table->fts->cache; ut_ad(ib_vector_is_empty(cache->get_docs)); @@ -6110,6 +6193,29 @@ fts_init_get_doc_id( doc_id = static_cast<doc_id_t>(mach_read_from_8( static_cast<const byte*>(data))); + exp = que_node_get_next(que_node_get_next(exp)); + if (exp) { + ut_ad(table->versioned()); + dfield = que_node_get_val(exp); + type = dfield_get_type(dfield); + ut_ad(type->vers_sys_end()); + data = dfield_get_data(dfield); + ulint len = dfield_get_len(dfield); + if (table->versioned_by_id()) { + ut_ad(len == sizeof trx_id_max_bytes); + if (0 != memcmp(data, trx_id_max_bytes, len)) { + return true; + } + } else { + ut_ad(len == sizeof timestamp_max_bytes); + if (0 != memcmp(data, timestamp_max_bytes, len)) { + return true; + } + } + ut_ad(!(exp = que_node_get_next(exp))); + } + ut_ad(!exp); + if (doc_id >= cache->next_doc_id) { cache->next_doc_id = doc_id + 1; } @@ -6275,7 +6381,7 @@ fts_init_index( fts_doc_fetch_by_doc_id(NULL, start_doc, index, FTS_FETCH_DOC_BY_ID_LARGE, - fts_init_get_doc_id, cache); + fts_init_get_doc_id, table); } else { if (table->fts->cache->stopword_info.status & STOPWORD_NOT_INIT) { diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 2a0de690174..c83deb72f02 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -8946,6 +8946,9 @@ ha_innobase::update_row( innobase_srv_conc_enter_innodb(m_prebuilt); + if (m_prebuilt->upd_node->is_delete) { + trx->fts_next_doc_id = 0; + } error = row_update_for_mysql(m_prebuilt); if (error == DB_SUCCESS && vers_ins_row @@ -9066,6 +9069,7 @@ ha_innobase::delete_row( && trx->id != table->vers_start_id() ? VERSIONED_DELETE : PLAIN_DELETE; + trx->fts_next_doc_id = 0; innobase_srv_conc_enter_innodb(m_prebuilt); @@ -10062,9 +10066,12 @@ ha_innobase::ft_init_ext( /*****************************************************************//** Set up search tuple for a query through FTS_DOC_ID_INDEX on supplied Doc ID. This is used by MySQL to retrieve the documents -once the search result (Doc IDs) is available */ +once the search result (Doc IDs) is available + +@return DB_SUCCESS or DB_INDEX_CORRUPT +*/ static -void +dberr_t innobase_fts_create_doc_id_key( /*===========================*/ dtuple_t* tuple, /* in/out: m_prebuilt->search_tuple */ @@ -10076,8 +10083,10 @@ innobase_fts_create_doc_id_key( { doc_id_t temp_doc_id; dfield_t* dfield = dtuple_get_nth_field(tuple, 0); + const ulint n_uniq = index->table->fts_n_uniq(); - ut_a(dict_index_get_n_unique(index) == 1); + if (dict_index_get_n_unique(index) != n_uniq) + return DB_INDEX_CORRUPT; dtuple_set_n_fields(tuple, index->n_fields); dict_index_copy_types(tuple, index, index->n_fields); @@ -10095,12 +10104,25 @@ innobase_fts_create_doc_id_key( *doc_id = temp_doc_id; dfield_set_data(dfield, doc_id, sizeof(*doc_id)); - dtuple_set_n_fields_cmp(tuple, 1); + if (n_uniq == 2) { + ut_ad(index->table->versioned()); + dfield = dtuple_get_nth_field(tuple, 1); + if (index->table->versioned_by_id()) { + dfield_set_data(dfield, trx_id_max_bytes, + sizeof(trx_id_max_bytes)); + } else { + dfield_set_data(dfield, timestamp_max_bytes, + sizeof(timestamp_max_bytes)); + } + } + + dtuple_set_n_fields_cmp(tuple, n_uniq); - for (ulint i = 1; i < index->n_fields; i++) { + for (ulint i = n_uniq; i < index->n_fields; i++) { dfield = dtuple_get_nth_field(tuple, i); dfield_set_null(dfield); } + return DB_SUCCESS; } /**********************************************************************//** @@ -10182,14 +10204,18 @@ next_record: /* We pass a pointer of search_doc_id because it will be converted to storage byte order used in the search tuple. */ - innobase_fts_create_doc_id_key(tuple, index, &search_doc_id); + dberr_t ret = innobase_fts_create_doc_id_key( + tuple, index, &search_doc_id); - innobase_srv_conc_enter_innodb(m_prebuilt); + if (ret == DB_SUCCESS) { + innobase_srv_conc_enter_innodb(m_prebuilt); - dberr_t ret = row_search_for_mysql( - (byte*) buf, PAGE_CUR_GE, m_prebuilt, ROW_SEL_EXACT, 0); + ret = row_search_for_mysql( + (byte*) buf, PAGE_CUR_GE, m_prebuilt, + ROW_SEL_EXACT, 0); - innobase_srv_conc_exit_innodb(m_prebuilt); + innobase_srv_conc_exit_innodb(m_prebuilt); + } int error; @@ -17354,7 +17380,7 @@ innodb_stopword_table_validate( /* Validate the stopword table's (if supplied) existence and of the right format */ int ret = stopword_table_name && !fts_valid_stopword_table( - stopword_table_name); + stopword_table_name, NULL); row_mysql_unlock_data_dictionary(trx); diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc index cbb0cc623fe..17fe6014fea 100644 --- a/storage/innobase/handler/handler0alter.cc +++ b/storage/innobase/handler/handler0alter.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 2005, 2019, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2013, 2022, MariaDB Corporation. +Copyright (c) 2013, 2023, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -2395,6 +2395,11 @@ innodb_instant_alter_column_allowed_reason: < dict_table_get_n_user_cols(m_prebuilt->table))); if (fulltext_indexes && m_prebuilt->table->fts) { + /* FTS index of versioned table has row_end, need rebuild */ + if (table->versioned() != altered_table->versioned()) { + need_rebuild= true; + } + /* FULLTEXT indexes are supposed to remain. */ /* Disallow DROP INDEX FTS_DOC_ID_INDEX */ @@ -3835,6 +3840,8 @@ innobase_fts_check_doc_id_index( /* Check if a unique index with the name of FTS_DOC_ID_INDEX_NAME is being created. */ + const ulint fts_n_uniq= altered_table->versioned() ? 2 : 1; + for (uint i = 0; i < altered_table->s->keys; i++) { const KEY& key = altered_table->key_info[i]; @@ -3844,7 +3851,7 @@ innobase_fts_check_doc_id_index( } if ((key.flags & HA_NOSAME) - && key.user_defined_key_parts == 1 + && key.user_defined_key_parts == fts_n_uniq && !strcmp(key.name.str, FTS_DOC_ID_INDEX_NAME) && !strcmp(key.key_part[0].field->field_name.str, FTS_DOC_ID_COL_NAME)) { @@ -3874,7 +3881,7 @@ innobase_fts_check_doc_id_index( } if (!dict_index_is_unique(index) - || dict_index_get_n_unique(index) > 1 + || dict_index_get_n_unique(index) != table->fts_n_uniq() || strcmp(index->name, FTS_DOC_ID_INDEX_NAME)) { return(FTS_INCORRECT_DOC_ID_INDEX); } @@ -3915,6 +3922,7 @@ innobase_fts_check_doc_id_index_in_def( { /* Check whether there is a "FTS_DOC_ID_INDEX" in the to be built index list */ + const uint fts_n_uniq= key_info->table->versioned() ? 2 : 1; for (ulint j = 0; j < n_key; j++) { const KEY* key = &key_info[j]; @@ -3925,7 +3933,7 @@ innobase_fts_check_doc_id_index_in_def( /* Do a check on FTS DOC ID_INDEX, it must be unique, named as "FTS_DOC_ID_INDEX" and on column "FTS_DOC_ID" */ if (!(key->flags & HA_NOSAME) - || key->user_defined_key_parts != 1 + || key->user_defined_key_parts != fts_n_uniq || strcmp(key->name.str, FTS_DOC_ID_INDEX_NAME) || strcmp(key->key_part[0].field->field_name.str, FTS_DOC_ID_COL_NAME)) { @@ -4117,13 +4125,22 @@ created_clustered: if (add_fts_doc_idx) { index_def_t* index = indexdef++; + uint nfields = 1; + if (altered_table->versioned()) + ++nfields; index->fields = static_cast<index_field_t*>( - mem_heap_alloc(heap, sizeof *index->fields)); - index->n_fields = 1; - index->fields->col_no = fts_doc_id_col; - index->fields->prefix_len = 0; - index->fields->is_v_col = false; + mem_heap_alloc(heap, sizeof(*index->fields) * nfields)); + index->n_fields = nfields; + index->fields[0].col_no = fts_doc_id_col; + index->fields[0].prefix_len = 0; + index->fields[0].is_v_col = false; + if (nfields == 2) { + index->fields[1].col_no + = altered_table->s->vers.end_fieldno; + index->fields[1].prefix_len = 0; + index->fields[1].is_v_col = false; + } index->ind_type = DICT_UNIQUE; ut_ad(!rebuild || !add_fts_doc_id diff --git a/storage/innobase/include/dict0mem.h b/storage/innobase/include/dict0mem.h index 596800ee8d2..f0d4eb7bebf 100644 --- a/storage/innobase/include/dict0mem.h +++ b/storage/innobase/include/dict0mem.h @@ -2350,6 +2350,9 @@ public: or mysql/innodb_index_stats. @return true if the table name is same as stats table */ bool is_stats_table() const; + + /** @return number of unique columns in FTS_DOC_ID index */ + unsigned fts_n_uniq() const { return versioned() ? 2 : 1; } }; inline void dict_index_t::set_modified(mtr_t& mtr) const diff --git a/storage/innobase/include/fts0fts.h b/storage/innobase/include/fts0fts.h index d3cfa5b23df..fcf5a5da9de 100644 --- a/storage/innobase/include/fts0fts.h +++ b/storage/innobase/include/fts0fts.h @@ -830,15 +830,14 @@ fts_get_max_doc_id( /*===============*/ dict_table_t* table); /*!< in: user table */ -/******************************************************************//** -Check whether user supplied stopword table exists and is of -the right format. -@return the stopword column charset if qualifies */ -CHARSET_INFO* -fts_valid_stopword_table( -/*=====================*/ - const char* stopword_table_name); /*!< in: Stopword table - name */ +/** Check whether a stopword table is in the right format. +@param stopword_table_name table name +@param row_end name of the system-versioning end column, or "value" +@return the stopword column charset +@retval NULL if the table does not exist or qualify */ +CHARSET_INFO *fts_valid_stopword_table(const char *stopword_table_name, + const char **row_end= NULL); + /****************************************************************//** This function loads specified stopword into FTS cache @return true if success */ diff --git a/storage/innobase/include/row0ins.h b/storage/innobase/include/row0ins.h index 9a16394a052..34427dc6dc7 100644 --- a/storage/innobase/include/row0ins.h +++ b/storage/innobase/include/row0ins.h @@ -206,7 +206,6 @@ struct ins_node_t if this is NULL, entry list should be created and buffers for sys fields in row allocated */ void vers_update_end(row_prebuilt_t *prebuilt, bool history_row); - bool vers_history_row() const; /* true if 'row' is historical */ }; /** Create an insert object. diff --git a/storage/innobase/include/row0upd.h b/storage/innobase/include/row0upd.h index b500b11e95c..0287040c418 100644 --- a/storage/innobase/include/row0upd.h +++ b/storage/innobase/include/row0upd.h @@ -599,17 +599,13 @@ public: void vers_make_update(const trx_t *trx) { vers_update_fields(trx, table->vers_start); - } + } - /** Only set row_end = CURRENT_TIMESTAMP/trx->id. - Do not touch other fields at all. - @param[in] trx transaction */ - void vers_make_delete(const trx_t *trx) - { - update->n_fields = 0; - is_delete = VERSIONED_DELETE; - vers_update_fields(trx, table->vers_end); - } + /** Prepare update vector for versioned delete. + Set row_end to CURRENT_TIMESTAMP or trx->id. + Initialize fts_next_doc_id for versioned delete. + @param[in] trx transaction */ + void vers_make_delete(trx_t *trx); }; #define UPD_NODE_MAGIC_N 1579975 diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc index cf9468c4997..81f662e7a15 100644 --- a/storage/innobase/row/row0ins.cc +++ b/storage/innobase/row/row0ins.cc @@ -2051,6 +2051,61 @@ row_ins_dupl_error_with_rec( return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets))); } +/** Determine whether a history row was inserted by this transaction +(row TRX_ID is the same as current TRX_ID). +@param index secondary index +@param rec secondary index record +@param trx transaction +@return error code +@retval DB_SUCCESS on success +@retval DB_FOREIGN_DUPLICATE_KEY if a history row was inserted by trx */ +static dberr_t vers_row_same_trx(dict_index_t* index, const rec_t* rec, + const trx_t& trx) +{ + mtr_t mtr; + dberr_t ret= DB_SUCCESS; + dict_index_t *clust_index= dict_table_get_first_index(index->table); + ut_ad(index != clust_index); + + mtr.start(); + + if (const rec_t *clust_rec= + row_get_clust_rec(BTR_SEARCH_LEAF, rec, index, &clust_index, &mtr)) + { + rec_offs offsets_[REC_OFFS_NORMAL_SIZE]; + rec_offs *clust_offs= offsets_; + rec_offs_init(offsets_); + mem_heap_t *heap= NULL; + + clust_offs= + rec_get_offsets(clust_rec, clust_index, clust_offs, + clust_index->n_core_fields, ULINT_UNDEFINED, &heap); + if (clust_index->vers_history_row(clust_rec, clust_offs)) + { + ulint trx_id_len; + const byte *trx_id= rec_get_nth_field(clust_rec, clust_offs, + clust_index->n_uniq, &trx_id_len); + ut_ad(trx_id_len == DATA_TRX_ID_LEN); + + if (trx.id == trx_read_trx_id(trx_id)) + ret= DB_FOREIGN_DUPLICATE_KEY; + } + + if (UNIV_LIKELY_NULL(heap)) + mem_heap_free(heap); + } + else + { + ib::error() << "foreign constraints: secondary index " << index->name << + " of table " << index->table->name << " is out of sync"; + ut_ad("secondary index is out of sync" == 0); + ret= DB_TABLE_CORRUPT; + } + + mtr.commit(); + return ret; +} + /***************************************************************//** Scans a unique non-clustered index at a given index entry to determine whether a uniqueness violation has occurred for the key value of the entry. @@ -2074,7 +2129,6 @@ row_ins_scan_sec_index_for_duplicate( ulint n_fields_cmp; btr_pcur_t pcur; dberr_t err = DB_SUCCESS; - ulint allow_duplicates; rec_offs offsets_[REC_OFFS_SEC_INDEX_SIZE]; rec_offs* offsets = offsets_; DBUG_ENTER("row_ins_scan_sec_index_for_duplicate"); @@ -2112,7 +2166,7 @@ row_ins_scan_sec_index_for_duplicate( : BTR_SEARCH_LEAF, &pcur, mtr); - allow_duplicates = thr_get_trx(thr)->duplicates; + trx_t* const trx = thr_get_trx(thr); /* Scan index records and check if there is a duplicate */ @@ -2133,7 +2187,7 @@ row_ins_scan_sec_index_for_duplicate( if (flags & BTR_NO_LOCKING_FLAG) { /* Set no locks when applying log in online table rebuild. */ - } else if (allow_duplicates) { + } else if (trx->duplicates) { /* If the SQL-query will update or replace duplicate key we will take X-lock for @@ -2167,9 +2221,18 @@ row_ins_scan_sec_index_for_duplicate( if (cmp == 0) { if (row_ins_dupl_error_with_rec(rec, entry, index, offsets)) { + err = DB_DUPLICATE_KEY; - thr_get_trx(thr)->error_info = index; + trx->error_info = index; + + if (!index->table->versioned()) { + } else if (dberr_t e = + vers_row_same_trx(index, rec, + *trx)) { + err = e; + goto end_scan; + } /* If the duplicate is on hidden FTS_DOC_ID, state so in the error log */ @@ -3605,16 +3668,6 @@ row_ins_get_row_from_select( } } -inline -bool ins_node_t::vers_history_row() const -{ - if (!table->versioned()) - return false; - dfield_t* row_end = dtuple_get_nth_field(row, table->vers_end); - return row_end->vers_history_row(); -} - - /***********************************************************//** Inserts a row to a table. @return DB_SUCCESS if operation successfully completed, else error @@ -3653,31 +3706,12 @@ row_ins( ut_ad(node->state == INS_NODE_INSERT_ENTRIES); while (node->index != NULL) { - dict_index_t *index = node->index; - /* - We do not insert history rows into FTS_DOC_ID_INDEX because - it is unique by FTS_DOC_ID only and we do not want to add - row_end to unique key. Fulltext field works the way new - FTS_DOC_ID is created on every fulltext UPDATE, so holding only - FTS_DOC_ID for history is enough. - */ - const unsigned type = index->type; - if (index->type & DICT_FTS) { - } else if (!(type & DICT_UNIQUE) || index->n_uniq > 1 - || !node->vers_history_row()) { - + if (!(node->index->type & DICT_FTS)) { dberr_t err = row_ins_index_entry_step(node, thr); if (err != DB_SUCCESS) { DBUG_RETURN(err); } - } else { - /* Unique indexes with system versioning must contain - the version end column. The only exception is a hidden - FTS_DOC_ID_INDEX that InnoDB may create on a hidden or - user-created FTS_DOC_ID column. */ - ut_ad(!strcmp(index->name, FTS_DOC_ID_INDEX_NAME)); - ut_ad(!strcmp(index->fields[0].name, FTS_DOC_ID_COL_NAME)); } node->index = dict_table_get_next_index(node->index); diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index 2ba6e4e1a52..9d755ce6f1e 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -457,6 +457,8 @@ row_merge_buf_redundant_convert( @param[in,out] row table row @param[in] ext cache of externally stored column prefixes, or NULL +@param[in] history_fts row is historical in a system-versioned table + on which a FTS_DOC_ID_INDEX(FTS_DOC_ID) exists @param[in,out] doc_id Doc ID if we are creating FTS index @param[in,out] conv_heap memory heap where to allocate data when @@ -478,6 +480,7 @@ row_merge_buf_add( fts_psort_t* psort_info, dtuple_t* row, const row_ext_t* ext, + const bool history_fts, doc_id_t* doc_id, mem_heap_t* conv_heap, dberr_t* err, @@ -540,7 +543,7 @@ error: : NULL; /* Process the Doc ID column */ - if (!v_col && *doc_id + if (!v_col && (history_fts || *doc_id) && col->ind == index->table->fts->doc_col) { fts_write_doc_id((byte*) &write_doc_id, *doc_id); @@ -592,7 +595,7 @@ error: /* Tokenize and process data for FTS */ - if (index->type & DICT_FTS) { + if (!history_fts && (index->type & DICT_FTS)) { fts_doc_item_t* doc_item; byte* value; void* ptr; @@ -1894,6 +1897,7 @@ row_merge_read_clustered_index( dtuple_t* row; row_ext_t* ext; page_cur_t* cur = btr_pcur_get_page_cur(&pcur); + bool history_row, history_fts = false; page_cur_move_to_next(cur); @@ -2131,6 +2135,11 @@ end_of_index: row_heap); ut_ad(row); + history_row = new_table->versioned() + && dtuple_get_nth_field(row, new_table->vers_end) + ->vers_history_row(); + history_fts = history_row && new_table->fts; + for (ulint i = 0; i < n_nonnull; i++) { dfield_t* field = &row->fields[nonnull[i]]; @@ -2160,7 +2169,7 @@ end_of_index: } /* Get the next Doc ID */ - if (add_doc_id) { + if (add_doc_id && !history_fts) { doc_id++; } else { doc_id = 0; @@ -2196,13 +2205,6 @@ end_of_index: ut_ad(add_autoinc < dict_table_get_n_user_cols(new_table)); - bool history_row = false; - if (new_table->versioned()) { - const dfield_t* dfield = dtuple_get_nth_field( - row, new_table->vers_end); - history_row = dfield->vers_history_row(); - } - dfield_t* dfield = dtuple_get_nth_field(row, add_autoinc); @@ -2326,8 +2328,8 @@ write_buffers: if (UNIV_LIKELY (row && (rows_added = row_merge_buf_add( buf, fts_index, old_table, new_table, - psort_info, row, ext, &doc_id, - conv_heap, &err, + psort_info, row, ext, history_fts, + &doc_id, conv_heap, &err, &v_heap, eval_table, trx)))) { /* Set the page flush observer for the @@ -2659,9 +2661,10 @@ write_buffers: if (UNIV_UNLIKELY (!(rows_added = row_merge_buf_add( buf, fts_index, old_table, - new_table, psort_info, row, ext, - &doc_id, conv_heap, - &err, &v_heap, eval_table, trx)))) { + new_table, psort_info, + row, ext, history_fts, &doc_id, + conv_heap, &err, &v_heap, + eval_table, trx)))) { /* An empty buffer should have enough room for at least one record. */ ut_ad(err == DB_COMPUTE_VALUE_FAILED diff --git a/storage/innobase/row/row0mysql.cc b/storage/innobase/row/row0mysql.cc index 84964970ad0..bb994a926eb 100644 --- a/storage/innobase/row/row0mysql.cc +++ b/storage/innobase/row/row0mysql.cc @@ -1425,7 +1425,10 @@ error_exit: return(err); } - if (dict_table_has_fts_index(table)) { + if (dict_table_has_fts_index(table) + && (!table->versioned() + || !node->row->fields[table->vers_end].vers_history_row())) { + doc_id_t doc_id; /* Extract the doc id from the hidden FTS column */ @@ -1639,7 +1642,7 @@ row_fts_update_or_delete( ut_a(dict_table_has_fts_index(prebuilt->table)); /* Deletes are simple; get them out of the way first. */ - if (node->is_delete == PLAIN_DELETE) { + if (node->is_delete) { /* A delete affects all FTS indexes, so we pass NULL */ fts_trx_add_op(trx, table, old_doc_id, FTS_DELETE, NULL); } else { @@ -1648,7 +1651,7 @@ row_fts_update_or_delete( if (new_doc_id == 0) { ib::error() << "InnoDB FTS: Doc ID cannot be 0"; - return(DB_FTS_INVALID_DOCID); + DBUG_RETURN(DB_FTS_INVALID_DOCID); } row_fts_do_update(trx, table, old_doc_id, new_doc_id); } @@ -2204,7 +2207,7 @@ row_update_cascade_for_mysql( return(DB_FOREIGN_EXCEED_MAX_CASCADE); } - const trx_t* trx = thr_get_trx(thr); + trx_t* trx = thr_get_trx(thr); if (table->versioned()) { if (node->is_delete == PLAIN_DELETE) { diff --git a/storage/innobase/row/row0row.cc b/storage/innobase/row/row0row.cc index f0e5385be85..acd687d2a9c 100644 --- a/storage/innobase/row/row0row.cc +++ b/storage/innobase/row/row0row.cc @@ -1360,7 +1360,7 @@ row_raw_format_int( ulint buf_size, /*!< in: output buffer size in bytes */ ibool* format_in_hex) /*!< out: should the data be - formated in hex */ + formatted in hex */ { ulint ret; @@ -1408,7 +1408,7 @@ row_raw_format_str( ulint buf_size, /*!< in: output buffer size in bytes */ ibool* format_in_hex) /*!< out: should the data be - formated in hex */ + formatted in hex */ { ulint charset_coll; diff --git a/storage/innobase/row/row0sel.cc b/storage/innobase/row/row0sel.cc index c817b700b65..4504fabd4e0 100644 --- a/storage/innobase/row/row0sel.cc +++ b/storage/innobase/row/row0sel.cc @@ -5113,8 +5113,10 @@ wrong_offs: goto no_gap_lock; } + /* Set next-key lock both for delete- and non-delete-marked + records for unique search, because non-delete-marked record can + be marked as deleted while transaction suspends. */ if (!set_also_gap_locks - || (unique_search && !rec_get_deleted_flag(rec, comp)) || dict_index_is_spatial(index)) { goto no_gap_lock; diff --git a/storage/innobase/row/row0upd.cc b/storage/innobase/row/row0upd.cc index a79ff94b61b..c7087559f47 100644 --- a/storage/innobase/row/row0upd.cc +++ b/storage/innobase/row/row0upd.cc @@ -3539,3 +3539,16 @@ skip_append: } } } + + +/** Prepare update vector for versioned delete. +Set row_end to CURRENT_TIMESTAMP or trx->id. +Initialize fts_next_doc_id for versioned delete. +@param[in] trx transaction */ +void upd_node_t::vers_make_delete(trx_t* trx) +{ + update->n_fields= 0; + is_delete= VERSIONED_DELETE; + vers_update_fields(trx, table->vers_end); + trx->fts_next_doc_id= table->fts ? UINT64_UNDEFINED : 0; +} |