summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0merge.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0merge.cc')
-rw-r--r--storage/innobase/row/row0merge.cc2321
1 files changed, 1561 insertions, 760 deletions
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index dbca52f820c..da48be090ec 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -26,6 +26,10 @@ Completed by Sunny Bains and Marko Makela
#include <my_config.h>
#include <log.h>
+#include <math.h>
+
+#include "ha_prototypes.h"
+
#include "row0merge.h"
#include "row0ext.h"
#include "row0log.h"
@@ -39,7 +43,10 @@ Completed by Sunny Bains and Marko Makela
#include "row0ftsort.h"
#include "row0import.h"
#include "handler0alter.h"
-#include "ha_prototypes.h"
+#include "btr0bulk.h"
+#include "fsp0sysspace.h"
+#include "ut0new.h"
+#include "ut0stage.h"
#include "math.h" /* log() */
#include "fil0crypt.h"
@@ -50,34 +57,218 @@ float my_log2f(float n)
}
/* Ignore posix_fadvise() on those platforms where it does not exist */
-#if defined __WIN__
+#if defined _WIN32
# define posix_fadvise(fd, offset, len, advice) /* nothing */
-#endif /* __WIN__ */
-
-#ifdef UNIV_DEBUG
-/** Set these in order ot enable debug printout. */
-/* @{ */
-/** Log each record read from temporary file. */
-static ibool row_merge_print_read;
-/** Log each record write to temporary file. */
-static ibool row_merge_print_write;
-/** Log each row_merge_blocks() call, merging two blocks of records to
-a bigger one. */
-static ibool row_merge_print_block;
-/** Log each block read from temporary file. */
-static ibool row_merge_print_block_read;
-/** Log each block read from temporary file. */
-static ibool row_merge_print_block_write;
-/* @} */
-#endif /* UNIV_DEBUG */
+#endif /* _WIN32 */
/* Whether to disable file system cache */
-UNIV_INTERN char srv_disable_sort_file_cache;
+char srv_disable_sort_file_cache;
+
+/** Class that caches index row tuples made from a single cluster
+index page scan, and then insert into corresponding index tree */
+class index_tuple_info_t {
+public:
+ /** constructor
+ @param[in] heap memory heap
+ @param[in] index index to be created */
+ index_tuple_info_t(
+ mem_heap_t* heap,
+ dict_index_t* index) UNIV_NOTHROW
+ {
+ m_heap = heap;
+ m_index = index;
+ m_dtuple_vec = UT_NEW_NOKEY(idx_tuple_vec());
+ }
+
+ /** destructor */
+ ~index_tuple_info_t()
+ {
+ UT_DELETE(m_dtuple_vec);
+ }
+
+ /** Get the index object
+ @return the index object */
+ dict_index_t* get_index() UNIV_NOTHROW
+ {
+ return(m_index);
+ }
+
+ /** Caches an index row into index tuple vector
+ @param[in] row table row
+ @param[in] ext externally stored column
+ prefixes, or NULL */
+ void add(
+ const dtuple_t* row,
+ const row_ext_t* ext) UNIV_NOTHROW
+ {
+ dtuple_t* dtuple;
+
+ dtuple = row_build_index_entry(row, ext, m_index, m_heap);
+
+ ut_ad(dtuple);
+
+ m_dtuple_vec->push_back(dtuple);
+ }
+
+ /** Insert spatial index rows cached in vector into spatial index
+ @param[in] trx_id transaction id
+ @param[in,out] row_heap memory heap
+ @param[in] pcur cluster index scanning cursor
+ @param[in,out] scan_mtr mini-transaction for pcur
+ @param[out] mtr_committed whether scan_mtr got committed
+ @return DB_SUCCESS if successful, else error number */
+ dberr_t insert(
+ trx_id_t trx_id,
+ mem_heap_t* row_heap,
+ btr_pcur_t* pcur,
+ mtr_t* scan_mtr,
+ bool* mtr_committed)
+ {
+ big_rec_t* big_rec;
+ rec_t* rec;
+ btr_cur_t ins_cur;
+ mtr_t mtr;
+ rtr_info_t rtr_info;
+ ulint* ins_offsets = NULL;
+ dberr_t error = DB_SUCCESS;
+ dtuple_t* dtuple;
+ ulint count = 0;
+ const ulint flag = BTR_NO_UNDO_LOG_FLAG
+ | BTR_NO_LOCKING_FLAG
+ | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG;
+
+ ut_ad(dict_index_is_spatial(m_index));
+
+ DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush",
+ log_sys->check_flush_or_checkpoint = true;
+ );
+
+ for (idx_tuple_vec::iterator it = m_dtuple_vec->begin();
+ it != m_dtuple_vec->end();
+ ++it) {
+ dtuple = *it;
+ ut_ad(dtuple);
+
+ if (log_sys->check_flush_or_checkpoint) {
+ if (!(*mtr_committed)) {
+ btr_pcur_move_to_prev_on_page(pcur);
+ btr_pcur_store_position(pcur, scan_mtr);
+ mtr_commit(scan_mtr);
+ *mtr_committed = true;
+ }
+
+ log_free_check();
+ }
+
+ mtr.start();
+ mtr.set_named_space(m_index->space);
+
+ ins_cur.index = m_index;
+ rtr_init_rtr_info(&rtr_info, false, &ins_cur, m_index,
+ false);
+ rtr_info_update_btr(&ins_cur, &rtr_info);
+
+ btr_cur_search_to_nth_level(m_index, 0, dtuple,
+ PAGE_CUR_RTREE_INSERT,
+ BTR_MODIFY_LEAF, &ins_cur,
+ 0, __FILE__, __LINE__,
+ &mtr);
+
+ /* It need to update MBR in parent entry,
+ so change search mode to BTR_MODIFY_TREE */
+ if (rtr_info.mbr_adj) {
+ mtr_commit(&mtr);
+ rtr_clean_rtr_info(&rtr_info, true);
+ rtr_init_rtr_info(&rtr_info, false, &ins_cur,
+ m_index, false);
+ rtr_info_update_btr(&ins_cur, &rtr_info);
+ mtr_start(&mtr);
+ mtr.set_named_space(m_index->space);
+ btr_cur_search_to_nth_level(
+ m_index, 0, dtuple,
+ PAGE_CUR_RTREE_INSERT,
+ BTR_MODIFY_TREE, &ins_cur, 0,
+ __FILE__, __LINE__, &mtr);
+ }
+
+ error = btr_cur_optimistic_insert(
+ flag, &ins_cur, &ins_offsets, &row_heap,
+ dtuple, &rec, &big_rec, 0, NULL, &mtr);
+
+ if (error == DB_FAIL) {
+ ut_ad(!big_rec);
+ mtr.commit();
+ mtr.start();
+ mtr.set_named_space(m_index->space);
+
+ rtr_clean_rtr_info(&rtr_info, true);
+ rtr_init_rtr_info(&rtr_info, false,
+ &ins_cur, m_index, false);
+
+ rtr_info_update_btr(&ins_cur, &rtr_info);
+ btr_cur_search_to_nth_level(
+ m_index, 0, dtuple,
+ PAGE_CUR_RTREE_INSERT,
+ BTR_MODIFY_TREE,
+ &ins_cur, 0,
+ __FILE__, __LINE__, &mtr);
+
+
+ error = btr_cur_pessimistic_insert(
+ flag, &ins_cur, &ins_offsets,
+ &row_heap, dtuple, &rec,
+ &big_rec, 0, NULL, &mtr);
+ }
+
+ DBUG_EXECUTE_IF(
+ "row_merge_ins_spatial_fail",
+ error = DB_FAIL;
+ );
+
+ if (error == DB_SUCCESS) {
+ if (rtr_info.mbr_adj) {
+ error = rtr_ins_enlarge_mbr(
+ &ins_cur, NULL, &mtr);
+ }
+
+ if (error == DB_SUCCESS) {
+ page_update_max_trx_id(
+ btr_cur_get_block(&ins_cur),
+ btr_cur_get_page_zip(&ins_cur),
+ trx_id, &mtr);
+ }
+ }
+
+ mtr_commit(&mtr);
+
+ rtr_clean_rtr_info(&rtr_info, true);
+ count++;
+ }
+
+ m_dtuple_vec->clear();
+
+ return(error);
+ }
+
+private:
+ /** Cache index rows made from a cluster index scan. Usually
+ for rows on single cluster index page */
+ typedef std::vector<dtuple_t*, ut_allocator<dtuple_t*> >
+ idx_tuple_vec;
+
+ /** vector used to cache index rows made from cluster index scan */
+ idx_tuple_vec* m_dtuple_vec;
+
+ /** the index being built */
+ dict_index_t* m_index;
+
+ /** memory heap for creating index tuples */
+ mem_heap_t* m_heap;
+};
/* Maximum pending doc memory limit in bytes for a fts tokenization thread */
#define FTS_PENDING_DOC_MEMORY_LIMIT 1000000
-
/******************************************************//**
Encrypt a merge block. */
static
@@ -93,7 +284,7 @@ row_merge_encrypt_buf(
{
uint key_version;
uint dstlen=0;
- os_offset_t ofs = (os_offset_t)srv_sort_buf_size * (os_offset_t)offset;
+ uint ofs = (uint)(srv_sort_buf_size * offset);
key_version = encryption_key_get_latest_version(crypt_data->key_id);
@@ -107,12 +298,12 @@ row_merge_encrypt_buf(
space, ofs, 0);
if (! ((rc == MY_AES_OK) && ((ulint)dstlen == srv_sort_buf_size-ROW_MERGE_RESERVE_SIZE))) {
- ib_logf(IB_LOG_LEVEL_FATAL,
- "Unable to encrypt data-block "
+ ib::error()
+ << "Unable to encrypt data-block "
" src: %p srclen: %lu buf: %p buflen: %u."
- " return-code: %d. Can't continue!\n",
- input_buf, (ulong) srv_sort_buf_size,
- crypted_buf, dstlen, rc);
+ << srv_sort_buf_size << " buf: " << crypted_buf
+ << " buflen: " << dstlen
+ << " return-code: " << rc << " Can't continue!";
ut_error;
}
}
@@ -132,7 +323,7 @@ row_merge_decrypt_buf(
{
uint key_version;
uint dstlen=0;
- os_offset_t ofs = (os_offset_t)srv_sort_buf_size * (os_offset_t)offset;
+ uint ofs = (uint)(srv_sort_buf_size * offset);
/* Read key_version from beginning of the buffer */
key_version = mach_read_from_4((byte *)input_buf);
@@ -149,53 +340,50 @@ row_merge_decrypt_buf(
space, ofs, 0);
if (! ((rc == MY_AES_OK) && ((ulint)dstlen == srv_sort_buf_size-ROW_MERGE_RESERVE_SIZE))) {
- ib_logf(IB_LOG_LEVEL_FATAL,
- "Unable to encrypt data-block "
- " src: %p srclen: %lu buf: %p buflen: %d."
- " return-code: %d. Can't continue!\n",
- input_buf, (ulong) srv_sort_buf_size,
- crypted_buf, dstlen, rc);
+ ib::error()
+ << "Unable to decrypt data-block "
+ << " src: " << input_buf << " srclen: "
+ << srv_sort_buf_size << " buf: " << crypted_buf
+ << " buflen: " << dstlen
+ << " return-code: " << rc << " Can't continue!";
ut_error;
}
return true;
}
-#ifdef UNIV_DEBUG
-/******************************************************//**
-Display a merge tuple. */
-static MY_ATTRIBUTE((nonnull))
-void
-row_merge_tuple_print(
-/*==================*/
- FILE* f, /*!< in: output stream */
- const mtuple_t* entry, /*!< in: tuple to print */
- ulint n_fields)/*!< in: number of fields in the tuple */
-{
- ulint j;
-
- for (j = 0; j < n_fields; j++) {
- const dfield_t* field = &entry->fields[j];
-
- if (dfield_is_null(field)) {
- fputs("\n NULL;", f);
- } else {
- ulint field_len = dfield_get_len(field);
- ulint len = ut_min(field_len, 20);
- if (dfield_is_ext(field)) {
- fputs("\nE", f);
- } else {
- fputs("\n ", f);
- }
- ut_print_buf(f, dfield_get_data(field), len);
- if (len != field_len) {
- fprintf(f, " (total %lu bytes)", field_len);
- }
- }
- }
- putc('\n', f);
-}
-#endif /* UNIV_DEBUG */
+/** Insert sorted data tuples to the index.
+@param[in] trx_id transaction identifier
+@param[in] index index to be inserted
+@param[in] old_table old table
+@param[in] fd file descriptor
+@param[in,out] block file buffer
+@param[in] row_buf row_buf the sorted data tuples,
+or NULL if fd, block will be used instead
+@param[in,out] btr_bulk btr bulk instance
+@param[in,out] stage performance schema accounting object, used by
+ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
+and then stage->inc() will be called for each record that is processed.
+@return DB_SUCCESS or error number */
+static __attribute__((warn_unused_result))
+dberr_t
+row_merge_insert_index_tuples(
+ trx_id_t trx_id,
+ dict_index_t* index,
+ const dict_table_t* old_table,
+ int fd,
+ row_merge_block_t* block,
+ const row_merge_buf_t* row_buf,
+ BtrBulk* btr_bulk,
+ const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
+ const float pct_progress, /*!< in: total progress
+ percent until now */
+ const float pct_cost, /*!< in: current progress percent
+ */
+ fil_space_crypt_t* crypt_data,/*!< in: table crypt data */
+ row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
+ ulint space, /*!< in: space id */
+ ut_stage_alter_t* stage = NULL);
/******************************************************//**
Encode an index record. */
@@ -215,7 +403,7 @@ row_merge_buf_encode(
ulint extra_size;
size = rec_get_converted_size_temp(
- index, entry->fields, n_fields, &extra_size);
+ index, entry->fields, n_fields, NULL, &extra_size);
ut_ad(size >= extra_size);
/* Encode extra_size + 1 */
@@ -228,14 +416,14 @@ row_merge_buf_encode(
}
rec_convert_dtuple_to_temp(*b + extra_size, index,
- entry->fields, n_fields);
+ entry->fields, n_fields, NULL);
*b += size;
}
/******************************************************//**
Allocate a sort buffer.
-@return own: sort buffer */
+@return own: sort buffer */
static MY_ATTRIBUTE((malloc, nonnull))
row_merge_buf_t*
row_merge_buf_create_low(
@@ -258,7 +446,7 @@ row_merge_buf_create_low(
buf->index = index;
buf->max_tuples = max_tuples;
buf->tuples = static_cast<mtuple_t*>(
- ut_malloc(2 * max_tuples * sizeof *buf->tuples));
+ ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
buf->tmp_tuples = buf->tuples + max_tuples;
return(buf);
@@ -266,8 +454,7 @@ row_merge_buf_create_low(
/******************************************************//**
Allocate a sort buffer.
-@return own: sort buffer */
-UNIV_INTERN
+@return own: sort buffer */
row_merge_buf_t*
row_merge_buf_create(
/*=================*/
@@ -279,7 +466,8 @@ row_merge_buf_create(
mem_heap_t* heap;
max_tuples = (srv_sort_buf_size - ROW_MERGE_RESERVE_SIZE)
- / ut_max(1, dict_index_get_min_size(index));
+ / ut_max(static_cast<ulint>(1),
+ dict_index_get_min_size(index));
buf_size = (sizeof *buf);
@@ -292,8 +480,7 @@ row_merge_buf_create(
/******************************************************//**
Empty a sort buffer.
-@return sort buffer */
-UNIV_INTERN
+@return sort buffer */
row_merge_buf_t*
row_merge_buf_empty(
/*================*/
@@ -319,7 +506,6 @@ row_merge_buf_empty(
/******************************************************//**
Deallocate a sort buffer. */
-UNIV_INTERN
void
row_merge_buf_free(
/*===============*/
@@ -345,7 +531,7 @@ row_merge_buf_redundant_convert(
const dfield_t* row_field,
dfield_t* field,
ulint len,
- ulint zip_size,
+ const page_size_t& page_size,
mem_heap_t* heap)
{
ut_ad(DATA_MBMINLEN(field->type.mbminmaxlen) == 1);
@@ -365,7 +551,7 @@ row_merge_buf_redundant_convert(
field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
byte* data = btr_copy_externally_stored_field(
- &ext_len, field_data, zip_size, field_len, heap, NULL);
+ &ext_len, field_data, page_size, field_len, heap);
ut_ad(ext_len < len);
@@ -384,6 +570,7 @@ row_merge_buf_redundant_convert(
@param[in,out] buf sort buffer
@param[in] fts_index fts index to be created
@param[in] old_table original table
+@param[in] new_table new table
@param[in,out] psort_info parallel sort info
@param[in] row table row
@param[in] ext cache of externally stored
@@ -394,8 +581,8 @@ row_merge_buf_redundant_convert(
converting to ROW_FORMAT=REDUNDANT, or NULL
when not to invoke
row_merge_buf_redundant_convert()
-@param[in,out] exceed_page set if the record size exceeds the page size
- when converting to ROW_FORMAT=REDUNDANT
+@param[in,out] err set if error occurs
+@param[in,out] v_heap heap memory to process data for virtual column
@return number of rows added, 0 if out of space */
static
ulint
@@ -403,12 +590,14 @@ row_merge_buf_add(
row_merge_buf_t* buf,
dict_index_t* fts_index,
const dict_table_t* old_table,
+ const dict_table_t* new_table,
fts_psort_t* psort_info,
const dtuple_t* row,
const row_ext_t* ext,
doc_id_t* doc_id,
mem_heap_t* conv_heap,
- bool* exceed_page)
+ dberr_t* err,
+ mem_heap_t** v_heap)
{
ulint i;
const dict_index_t* index;
@@ -438,6 +627,9 @@ row_merge_buf_add(
fts_index */
index = (buf->index->type & DICT_FTS) ? fts_index : buf->index;
+ /* create spatial index should not come here */
+ ut_ad(!dict_index_is_spatial(index));
+
n_fields = dict_index_get_n_fields(index);
entry = &buf->tuples[buf->n_tuples];
@@ -452,16 +644,22 @@ row_merge_buf_add(
for (i = 0; i < n_fields; i++, field++, ifield++) {
ulint len;
const dict_col_t* col;
+ const dict_v_col_t* v_col = NULL;
ulint col_no;
ulint fixed_len;
const dfield_t* row_field;
col = ifield->col;
+ if (dict_col_is_virtual(col)) {
+ v_col = reinterpret_cast<const dict_v_col_t*>(col);
+ }
+
col_no = dict_col_get_no(col);
/* Process the Doc ID column */
if (*doc_id > 0
- && col_no == index->table->fts->doc_col) {
+ && col_no == index->table->fts->doc_col
+ && !dict_col_is_virtual(col)) {
fts_write_doc_id((byte*) &write_doc_id, *doc_id);
/* Note: field->data now points to a value on the
@@ -478,9 +676,25 @@ row_merge_buf_add(
field->type.mbminmaxlen = DATA_MBMINMAXLEN(0, 0);
field->type.len = ifield->col->len;
} else {
- row_field = dtuple_get_nth_field(row, col_no);
+ /* Use callback to get the virtual column value */
+ if (dict_col_is_virtual(col)) {
+ dict_index_t* clust_index
+ = dict_table_get_first_index(new_table);
+
+ row_field = innobase_get_computed_value(
+ row, v_col, clust_index, NULL,
+ v_heap, NULL, ifield, false);
+
+ if (row_field == NULL) {
+ *err = DB_COMPUTE_VALUE_FAILED;
+ DBUG_RETURN(0);
+ }
+ dfield_copy(field, row_field);
+ } else {
+ row_field = dtuple_get_nth_field(row, col_no);
+ dfield_copy(field, row_field);
+ }
- dfield_copy(field, row_field);
/* Tokenize and process data for FTS */
if (index->type & DICT_FTS) {
@@ -506,9 +720,9 @@ row_merge_buf_add(
dfield_get_data(doc_field)));
if (*doc_id == 0) {
- ib_logf(IB_LOG_LEVEL_WARN,
- "FTS Doc ID is zero. "
- "Record Skipped");
+ ib::warn() << "FTS Doc ID is"
+ " zero. Record"
+ " skipped";
DBUG_RETURN(0);
}
}
@@ -518,8 +732,8 @@ row_merge_buf_add(
continue;
}
- ptr = ut_malloc(sizeof(*doc_item)
- + field->len);
+ ptr = ut_malloc_nokey(sizeof(*doc_item)
+ + field->len);
doc_item = static_cast<fts_doc_item_t*>(ptr);
value = static_cast<byte*>(ptr)
@@ -537,7 +751,6 @@ row_merge_buf_add(
if (psort_info[bucket].error == DB_SUCCESS) {
UT_LIST_ADD_LAST(
- doc_list,
psort_info[bucket].fts_doc_list,
doc_item);
psort_info[bucket].memory_used +=
@@ -562,11 +775,10 @@ row_merge_buf_add(
if (field->len != UNIV_SQL_NULL
&& col->mtype == DATA_MYSQL
&& col->len != field->len) {
-
if (conv_heap != NULL) {
row_merge_buf_redundant_convert(
row_field, field, col->len,
- dict_table_zip_size(old_table),
+ dict_table_page_size(old_table),
conv_heap);
} else {
/* Field length mismatch should not
@@ -617,7 +829,10 @@ row_merge_buf_add(
dfield_set_len(field, len);
}
- ut_ad(len <= col->len || col->mtype == DATA_BLOB);
+ ut_ad(len <= col->len
+ || DATA_LARGE_MTYPE(col->mtype)
+ || (col->mtype == DATA_POINT
+ && len == DATA_MBR_LEN));
fixed_len = ifield->fixed_len;
if (fixed_len && !dict_table_is_comp(index->table)
@@ -646,7 +861,7 @@ row_merge_buf_add(
} else if (dfield_is_ext(field)) {
extra_size += 2;
} else if (len < 128
- || (col->len < 256 && col->mtype != DATA_BLOB)) {
+ || (!DATA_BIG_COL(col))) {
extra_size++;
} else {
/* For variable-length columns, we look up the
@@ -670,7 +885,7 @@ row_merge_buf_add(
ulint extra;
size = rec_get_converted_size_temp(
- index, entry->fields, n_fields, &extra);
+ index, entry->fields, n_fields, NULL, &extra);
ut_ad(data_size + extra_size == size);
ut_ad(extra_size == extra);
@@ -688,7 +903,7 @@ row_merge_buf_add(
ut_ad(size < UNIV_PAGE_SIZE) in rec_offs_data_size().
It may hit the assert before attempting to insert the row. */
if (conv_heap != NULL && data_size > UNIV_PAGE_SIZE) {
- *exceed_page = true;
+ *err = DB_TOO_BIG_RECORD;
}
ut_ad(data_size < srv_sort_buf_size);
@@ -719,7 +934,6 @@ row_merge_buf_add(
/*************************************************************//**
Report a duplicate key. */
-UNIV_INTERN
void
row_merge_dup_report(
/*=================*/
@@ -735,7 +949,8 @@ row_merge_dup_report(
/*************************************************************//**
Compare two tuples.
-@return 1, 0, -1 if a is greater, equal, less, respectively, than b */
+@return positive, 0, negative if a is greater, equal, less, than b,
+respectively */
static MY_ATTRIBUTE((warn_unused_result))
int
row_merge_tuple_cmp(
@@ -799,17 +1014,18 @@ no_report:
/** Wrapper for row_merge_tuple_sort() to inject some more context to
UT_SORT_FUNCTION_BODY().
-@param tuples array of tuples that being sorted
-@param aux work area, same size as tuples[]
-@param low lower bound of the sorting area, inclusive
-@param high upper bound of the sorting area, inclusive */
+@param tuples array of tuples that being sorted
+@param aux work area, same size as tuples[]
+@param low lower bound of the sorting area, inclusive
+@param high upper bound of the sorting area, inclusive */
#define row_merge_tuple_sort_ctx(tuples, aux, low, high) \
row_merge_tuple_sort(n_uniq, n_field, dup, tuples, aux, low, high)
/** Wrapper for row_merge_tuple_cmp() to inject some more context to
UT_SORT_FUNCTION_BODY().
-@param a first tuple to be compared
-@param b second tuple to be compared
-@return 1, 0, -1 if a is greater, equal, less, respectively, than b */
+@param a first tuple to be compared
+@param b second tuple to be compared
+@return positive, 0, negative, if a is greater, equal, less, than b,
+respectively */
#define row_merge_tuple_cmp_ctx(a,b) \
row_merge_tuple_cmp(n_uniq, n_field, a, b, dup)
@@ -839,7 +1055,6 @@ row_merge_tuple_sort(
/******************************************************//**
Sort a buffer. */
-UNIV_INTERN
void
row_merge_buf_sort(
/*===============*/
@@ -847,6 +1062,8 @@ row_merge_buf_sort(
row_merge_dup_t* dup) /*!< in/out: reporter of duplicates
(NULL if non-unique index) */
{
+ ut_ad(!dict_index_is_spatial(buf->index));
+
row_merge_tuple_sort(dict_index_get_n_unique(buf->index),
dict_index_get_n_fields(buf->index),
dup,
@@ -855,7 +1072,6 @@ row_merge_buf_sort(
/******************************************************//**
Write a buffer to a block. */
-UNIV_INTERN
void
row_merge_buf_write(
/*================*/
@@ -868,19 +1084,24 @@ row_merge_buf_write(
ulint n_fields= dict_index_get_n_fields(index);
byte* b = &block[ROW_MERGE_RESERVE_SIZE];
+ DBUG_ENTER("row_merge_buf_write");
+
for (ulint i = 0; i < buf->n_tuples; i++) {
const mtuple_t* entry = &buf->tuples[i];
row_merge_buf_encode(&b, index, entry, n_fields);
ut_ad(b < &block[srv_sort_buf_size]);
+
#ifdef UNIV_DEBUG
- if (row_merge_print_write) {
- fprintf(stderr, "row_merge_buf_write %p,%d,%lu %lu",
- (void*) b, of->fd, (ulong) of->offset,
- (ulong) i);
- row_merge_tuple_print(stderr, entry, n_fields);
+ {
+ rec_printer p(entry->fields, n_fields);
+ DBUG_PRINT("ib_merge_sort",
+ ("%p,fd=%d,%lu %lu: %s",
+ reinterpret_cast<const void*>(b), of->fd,
+ ulong(of->offset), ulong(i),
+ p.str().c_str()));
}
-#endif /* UNIV_DEBUG */
+#endif
}
/* Write an "end-of-chunk" marker. */
@@ -892,18 +1113,17 @@ row_merge_buf_write(
to avoid bogus warnings. */
memset(b, 0xff, &block[srv_sort_buf_size] - b);
#endif /* UNIV_DEBUG_VALGRIND */
-#ifdef UNIV_DEBUG
- if (row_merge_print_write) {
- fprintf(stderr, "row_merge_buf_write %p,%d,%lu EOF\n",
- (void*) b, of->fd, (ulong) of->offset);
- }
-#endif /* UNIV_DEBUG */
+ DBUG_PRINT("ib_merge_sort",
+ ("write %p,%d,%lu EOF",
+ reinterpret_cast<const void*>(b), of->fd,
+ ulong(of->offset)));
+ DBUG_VOID_RETURN;
}
/******************************************************//**
Create a memory heap and allocate space for row_merge_rec_offsets()
and mrec_buf_t[3].
-@return memory heap */
+@return memory heap */
static
mem_heap_t*
row_merge_heap_create(
@@ -933,8 +1153,7 @@ row_merge_heap_create(
/********************************************************************//**
Read a merge block from the file system.
-@return TRUE if request was successful, FALSE if fail */
-UNIV_INTERN
+@return TRUE if request was successful, FALSE if fail */
ibool
row_merge_read(
/*===========*/
@@ -948,26 +1167,19 @@ row_merge_read(
ulint space) /*!< in: space id */
{
os_offset_t ofs = ((os_offset_t) offset) * srv_sort_buf_size;
- ibool success;
- DBUG_EXECUTE_IF("row_merge_read_failure", return(FALSE););
+ DBUG_ENTER("row_merge_read");
+ DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
+ DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE););
-#ifdef UNIV_DEBUG
- if (row_merge_print_block_read) {
- fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
- fd, (ulong) offset);
- }
-#endif /* UNIV_DEBUG */
+ IORequest request;
-#ifdef UNIV_DEBUG
- if (row_merge_print_block_read) {
- fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
- fd, (ulong) offset);
- }
-#endif /* UNIV_DEBUG */
+ /* Merge sort pages are never compressed. */
+ request.disable_compression();
- success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
- ofs, srv_sort_buf_size);
+ dberr_t err = os_file_read_no_error_handling(
+ request,
+ OS_FILE_FROM_FD(fd), buf, ofs, srv_sort_buf_size, NULL);
/* For encrypted tables, decrypt data after reading and copy data */
if (crypt_data && crypt_buf) {
@@ -981,20 +1193,16 @@ row_merge_read(
posix_fadvise(fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
#endif /* POSIX_FADV_DONTNEED */
- if (UNIV_UNLIKELY(!success)) {
- ut_print_timestamp(stderr);
- fprintf(stderr,
- " InnoDB: failed to read merge block at " UINT64PF "\n",
- ofs);
+ if (err != DB_SUCCESS) {
+ ib::error() << "Failed to read merge block at " << ofs;
}
- return(UNIV_LIKELY(success));
+ DBUG_RETURN(err == DB_SUCCESS);
}
/********************************************************************//**
Write a merge block to the file system.
-@return TRUE if request was successful, FALSE if fail */
-UNIV_INTERN
+@return TRUE if request was successful, FALSE if fail */
ibool
row_merge_write(
/*============*/
@@ -1008,12 +1216,13 @@ row_merge_write(
{
size_t buf_len = srv_sort_buf_size;
os_offset_t ofs = buf_len * (os_offset_t) offset;
- ibool ret;
void* out_buf = (void *)buf;
- DBUG_EXECUTE_IF("row_merge_write_failure", return(FALSE););
+ DBUG_ENTER("row_merge_write");
+ DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
+ DBUG_EXECUTE_IF("row_merge_write_failure", DBUG_RETURN(FALSE););
- /* For encrypted tables, encrypt data before writing */
+ IORequest request(IORequest::WRITE);
if (crypt_data && crypt_buf) {
row_merge_encrypt_buf(crypt_data, offset, space, (const byte *)buf, (byte *)crypt_buf);
out_buf = crypt_buf;
@@ -1022,14 +1231,11 @@ row_merge_write(
mach_write_to_4((byte *)out_buf, 0);
}
- ret = os_file_write("(merge)", OS_FILE_FROM_FD(fd), out_buf, ofs, buf_len);
+ request.disable_compression();
-#ifdef UNIV_DEBUG
- if (row_merge_print_block_write) {
- fprintf(stderr, "row_merge_write fd=%d ofs=%lu\n",
- fd, (ulong) offset);
- }
-#endif /* UNIV_DEBUG */
+ dberr_t err = os_file_write(
+ request,
+ "(merge)", OS_FILE_FROM_FD(fd), out_buf, ofs, buf_len);
#ifdef POSIX_FADV_DONTNEED
/* The block will be needed on the next merge pass,
@@ -1037,13 +1243,12 @@ row_merge_write(
posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
#endif /* POSIX_FADV_DONTNEED */
- return(UNIV_LIKELY(ret));
+ DBUG_RETURN(err == DB_SUCCESS);
}
/********************************************************************//**
Read a merge record.
-@return pointer to next record, or NULL on I/O error or end of list */
-UNIV_INTERN
+@return pointer to next record, or NULL on I/O error or end of list */
const byte*
row_merge_read_rec(
/*===============*/
@@ -1077,6 +1282,8 @@ row_merge_read_rec(
ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields(index));
+ DBUG_ENTER("row_merge_read_rec");
+
if (b == &block[0]) {
b+= ROW_MERGE_RESERVE_SIZE;
}
@@ -1086,14 +1293,12 @@ row_merge_read_rec(
if (UNIV_UNLIKELY(!extra_size)) {
/* End of list */
*mrec = NULL;
-#ifdef UNIV_DEBUG
- if (row_merge_print_read) {
- fprintf(stderr, "row_merge_read %p,%p,%d,%lu EOF\n",
- (const void*) b, (const void*) block,
- fd, (ulong) *foffs);
- }
-#endif /* UNIV_DEBUG */
- return(NULL);
+ DBUG_PRINT("ib_merge_sort",
+ ("read %p,%p,%d,%lu EOF\n",
+ reinterpret_cast<const void*>(b),
+ reinterpret_cast<const void*>(block),
+ fd, ulong(*foffs)));
+ DBUG_RETURN(NULL);
}
if (extra_size >= 0x80) {
@@ -1105,7 +1310,7 @@ row_merge_read_rec(
err_exit:
/* Signal I/O error. */
*mrec = b;
- return(NULL);
+ DBUG_RETURN(NULL);
}
/* Wrap around to the beginning of the buffer. */
@@ -1183,14 +1388,13 @@ err_exit:
avail_size = &block[srv_sort_buf_size] - b;
memcpy(*buf, b, avail_size);
*mrec = *buf + extra_size;
-#ifdef UNIV_DEBUG
+
/* We cannot invoke rec_offs_make_valid() here, because there
are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
Similarly, rec_offs_validate() would fail, because it invokes
rec_get_status(). */
- offsets[2] = (ulint) *mrec;
- offsets[3] = (ulint) index;
-#endif /* UNIV_DEBUG */
+ ut_d(offsets[2] = (ulint) *mrec);
+ ut_d(offsets[3] = (ulint) index);
if (!row_merge_read(fd, ++(*foffs), block,
crypt_data, crypt_block, space)) {
@@ -1206,17 +1410,19 @@ err_exit:
b += extra_size + data_size - avail_size;
func_exit:
+
#ifdef UNIV_DEBUG
- if (row_merge_print_read) {
- fprintf(stderr, "row_merge_read %p,%p,%d,%lu ",
- (const void*) b, (const void*) block,
- fd, (ulong) *foffs);
- rec_print_comp(stderr, *mrec, offsets);
- putc('\n', stderr);
+ {
+ rec_printer p(*mrec, 0, offsets);
+ DBUG_PRINT("ib_merge_sort",
+ ("%p,%p,fd=%d,%lu: %s",
+ reinterpret_cast<const void*>(b),
+ reinterpret_cast<const void*>(block),
+ fd, ulong(*foffs),
+ p.str().c_str()));
}
-#endif /* UNIV_DEBUG */
-
- return(b);
+#endif
+ DBUG_RETURN(b);
}
/********************************************************************//**
@@ -1227,29 +1433,34 @@ row_merge_write_rec_low(
/*====================*/
byte* b, /*!< out: buffer */
ulint e, /*!< in: encoded extra_size */
-#ifdef UNIV_DEBUG
+#ifndef DBUG_OFF
ulint size, /*!< in: total size to write */
int fd, /*!< in: file descriptor */
ulint foffs, /*!< in: file offset */
-#endif /* UNIV_DEBUG */
+#endif /* !DBUG_OFF */
const mrec_t* mrec, /*!< in: record to write */
const ulint* offsets)/*!< in: offsets of mrec */
-#ifndef UNIV_DEBUG
+#ifdef DBUG_OFF
# define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets) \
row_merge_write_rec_low(b, e, mrec, offsets)
-#endif /* !UNIV_DEBUG */
+#endif /* DBUG_OFF */
{
-#ifdef UNIV_DEBUG
+ DBUG_ENTER("row_merge_write_rec_low");
+
+#ifndef DBUG_OFF
const byte* const end = b + size;
- ut_ad(e == rec_offs_extra_size(offsets) + 1);
+#endif /* DBUG_OFF */
+ DBUG_ASSERT(e == rec_offs_extra_size(offsets) + 1);
- if (row_merge_print_write) {
- fprintf(stderr, "row_merge_write %p,%d,%lu ",
- (void*) b, fd, (ulong) foffs);
- rec_print_comp(stderr, mrec, offsets);
- putc('\n', stderr);
+#ifdef UNIV_DEBUG
+ {
+ rec_printer p(mrec, 0, offsets);
+ DBUG_PRINT("ib_merge_sort",
+ ("%p,fd=%d,%lu: %s",
+ reinterpret_cast<const void*>(b), fd, ulong(foffs),
+ p.str().c_str()));
}
-#endif /* UNIV_DEBUG */
+#endif
if (e < 0x80) {
*b++ = (byte) e;
@@ -1259,12 +1470,13 @@ row_merge_write_rec_low(
}
memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
- ut_ad(b + rec_offs_size(offsets) == end);
+ DBUG_ASSERT(b + rec_offs_size(offsets) == end);
+ DBUG_VOID_RETURN;
}
/********************************************************************//**
Write a merge record.
-@return pointer to end of block, or NULL on error */
+@return pointer to end of block, or NULL on error */
static
byte*
row_merge_write_rec(
@@ -1339,7 +1551,7 @@ row_merge_write_rec(
/********************************************************************//**
Write an end-of-list marker.
-@return pointer to end of block, or NULL on error */
+@return pointer to end of block, or NULL on error */
static
byte*
row_merge_write_eof(
@@ -1356,12 +1568,13 @@ row_merge_write_eof(
ut_ad(b >= &block[0]);
ut_ad(b < &block[srv_sort_buf_size]);
ut_ad(foffs);
-#ifdef UNIV_DEBUG
- if (row_merge_print_write) {
- fprintf(stderr, "row_merge_write %p,%p,%d,%lu EOF\n",
- (void*) b, (void*) block, fd, (ulong) *foffs);
- }
-#endif /* UNIV_DEBUG */
+
+ DBUG_ENTER("row_merge_write_eof");
+ DBUG_PRINT("ib_merge_sort",
+ ("%p,%p,fd=%d,%lu",
+ reinterpret_cast<const void*>(b),
+ reinterpret_cast<const void*>(block),
+ fd, ulong(*foffs)));
if (b == &block[0]) {
b+= ROW_MERGE_RESERVE_SIZE;
@@ -1379,26 +1592,26 @@ row_merge_write_eof(
if (!row_merge_write(fd, (*foffs)++, block,
crypt_data, crypt_block, space)) {
- return(NULL);
+ DBUG_RETURN(NULL);
}
UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
-
- return(&block[0]);
+ DBUG_RETURN(&block[0]);
}
/** Create a temporary file if it has not been created already.
-@param[in,out] tmpfd temporary file handle
-@param[in] path path to create temporary file
+@param[in,out] tmpfd temporary file handle
@return file descriptor, or -1 on failure */
static MY_ATTRIBUTE((warn_unused_result))
int
row_merge_tmpfile_if_needed(
- int* tmpfd,
- const char* path)
+ int* tmpfd)
{
if (*tmpfd < 0) {
- *tmpfd = row_merge_file_create_low(path);
+ *tmpfd = row_merge_file_create_low();
+ if (*tmpfd >= 0) {
+ MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
+ }
}
return(*tmpfd);
@@ -1406,21 +1619,19 @@ row_merge_tmpfile_if_needed(
/** Create a temporary file for merge sort if it was not created already.
@param[in,out] file merge file structure
-@param[in,out] tmpfd temporary file structure
@param[in] nrec number of records in the file
-@param[in] path path to create temporary files
@return file descriptor, or -1 on failure */
static MY_ATTRIBUTE((warn_unused_result))
int
row_merge_file_create_if_needed(
merge_file_t* file,
int* tmpfd,
- ulint nrec,
- const char* path)
+ ulint nrec)
{
ut_ad(file->fd < 0 || *tmpfd >=0);
- if (file->fd < 0 && row_merge_file_create(file, path) >= 0) {
- if (row_merge_tmpfile_if_needed(tmpfd, path) < 0) {
+ if (file->fd < 0 && row_merge_file_create(file) >= 0) {
+ MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
+ if (row_merge_tmpfile_if_needed(tmpfd) < 0) {
return(-1);
}
@@ -1431,6 +1642,121 @@ row_merge_file_create_if_needed(
return(file->fd);
}
+/** Copy the merge data tuple from another merge data tuple.
+@param[in] mtuple source merge data tuple
+@param[in,out] prev_mtuple destination merge data tuple
+@param[in] n_unique number of unique fields exist in the mtuple
+@param[in,out] heap memory heap where last_mtuple allocated */
+static
+void
+row_mtuple_create(
+ const mtuple_t* mtuple,
+ mtuple_t* prev_mtuple,
+ ulint n_unique,
+ mem_heap_t* heap)
+{
+ memcpy(prev_mtuple->fields, mtuple->fields,
+ n_unique * sizeof *mtuple->fields);
+
+ dfield_t* field = prev_mtuple->fields;
+
+ for (ulint i = 0; i < n_unique; i++) {
+ dfield_dup(field++, heap);
+ }
+}
+
+/** Compare two merge data tuples.
+@param[in] prev_mtuple merge data tuple
+@param[in] current_mtuple merge data tuple
+@param[in,out] dup reporter of duplicates
+@retval positive, 0, negative if current_mtuple is greater, equal, less, than
+last_mtuple. */
+static
+int
+row_mtuple_cmp(
+ const mtuple_t* prev_mtuple,
+ const mtuple_t* current_mtuple,
+ row_merge_dup_t* dup)
+{
+ ut_ad(dict_index_is_clust(dup->index));
+ const ulint n_unique = dict_index_get_n_unique(dup->index);
+
+ return(row_merge_tuple_cmp(
+ n_unique, n_unique, *current_mtuple, *prev_mtuple, dup));
+}
+
+/** Insert cached spatial index rows.
+@param[in] trx_id transaction id
+@param[in] sp_tuples cached spatial rows
+@param[in] num_spatial number of spatial indexes
+@param[in,out] row_heap heap for insert
+@param[in,out] sp_heap heap for tuples
+@param[in,out] pcur cluster index cursor
+@param[in,out] mtr mini transaction
+@param[in,out] mtr_committed whether scan_mtr got committed
+@return DB_SUCCESS or error number */
+static
+dberr_t
+row_merge_spatial_rows(
+ trx_id_t trx_id,
+ index_tuple_info_t** sp_tuples,
+ ulint num_spatial,
+ mem_heap_t* row_heap,
+ mem_heap_t* sp_heap,
+ btr_pcur_t* pcur,
+ mtr_t* mtr,
+ bool* mtr_committed)
+{
+ dberr_t err = DB_SUCCESS;
+
+ if (sp_tuples == NULL) {
+ return(DB_SUCCESS);
+ }
+
+ ut_ad(sp_heap != NULL);
+
+ for (ulint j = 0; j < num_spatial; j++) {
+ err = sp_tuples[j]->insert(
+ trx_id, row_heap,
+ pcur, mtr, mtr_committed);
+
+ if (err != DB_SUCCESS) {
+ return(err);
+ }
+ }
+
+ mem_heap_empty(sp_heap);
+
+ return(err);
+}
+
+/** Check if the geometry field is valid.
+@param[in] row the row
+@param[in] index spatial index
+@return true if it's valid, false if it's invalid. */
+static
+bool
+row_geo_field_is_valid(
+ const dtuple_t* row,
+ dict_index_t* index)
+{
+ const dict_field_t* ind_field
+ = dict_index_get_nth_field(index, 0);
+ const dict_col_t* col
+ = ind_field->col;
+ ulint col_no
+ = dict_col_get_no(col);
+ const dfield_t* dfield
+ = dtuple_get_nth_field(row, col_no);
+
+ if (dfield_is_null(dfield)
+ || dfield_get_len(dfield) < GEO_DATA_HEADER_SIZE) {
+ return(false);
+ }
+
+ return(true);
+}
+
/** Reads clustered index of the table and create temporary files
containing the index entries for the indexes to be built.
@param[in] trx transaction
@@ -1448,18 +1774,21 @@ containing the index entries for the indexes to be built.
@param[in] key_numbers MySQL key numbers to create
@param[in] n_index number of indexes to create
@param[in] add_cols default values of added columns, or NULL
+@param[in] add_v newly added virtual columns along with indexes
@param[in] col_map mapping of old column numbers to new ones, or
- NULL if old_table == new_table
+NULL if old_table == new_table
@param[in] add_autoinc number of added AUTO_INCREMENT columns, or
- ULINT_UNDEFINED if none is added
-@param[in,out] sequence autoinc sequence
+ULINT_UNDEFINED if none is added
+@param[in,out] sequence autoinc sequence
@param[in,out] block file buffer
+@param[in] skip_pk_sort whether the new PRIMARY KEY will follow
+existing order
@param[in,out] tmpfd temporary file handle
-@param[in] pct_cost percent of task weight out of total alter job
-@param[in] crypt_data crypt data or NULL
-@param[in,out] crypt_block crypted file buffer
-return DB_SUCCESS or error */
-static MY_ATTRIBUTE((nonnull(1,2,3,4,6,9,10,16), warn_unused_result))
+@param[in,out] stage performance schema accounting object, used by
+ALTER TABLE. stage->n_pk_recs_inc() will be called for each record read and
+stage->inc() will be called for each page read.
+@return DB_SUCCESS or error */
+static MY_ATTRIBUTE((warn_unused_result))
dberr_t
row_merge_read_clustered_index(
trx_t* trx,
@@ -1474,11 +1803,14 @@ row_merge_read_clustered_index(
const ulint* key_numbers,
ulint n_index,
const dtuple_t* add_cols,
+ const dict_add_v_col_t* add_v,
const ulint* col_map,
ulint add_autoinc,
ib_sequence_t& sequence,
row_merge_block_t* block,
+ bool skip_pk_sort,
int* tmpfd,
+ ut_stage_alter_t* stage,
float pct_cost,
fil_space_crypt_t* crypt_data,
row_merge_block_t* crypt_block)
@@ -1487,6 +1819,8 @@ row_merge_read_clustered_index(
mem_heap_t* row_heap; /* Heap memory to create
clustered index tuples */
row_merge_buf_t** merge_buf; /* Temporary list for records*/
+ mem_heap_t* v_heap = NULL; /* Heap memory to process large
+ data for virtual column */
btr_pcur_t pcur; /* Cursor on the clustered
index */
mtr_t mtr; /* Mini transaction */
@@ -1500,12 +1834,19 @@ row_merge_read_clustered_index(
ibool add_doc_id = FALSE;
os_event_t fts_parallel_sort_event = NULL;
ibool fts_pll_sort = FALSE;
- ib_int64_t sig_count = 0;
+ int64_t sig_count = 0;
+ index_tuple_info_t** sp_tuples = NULL;
+ mem_heap_t* sp_heap = NULL;
+ ulint num_spatial = 0;
+ BtrBulk* clust_btr_bulk = NULL;
+ bool clust_temp_file = false;
+ mem_heap_t* mtuple_heap = NULL;
+ mtuple_t prev_mtuple;
mem_heap_t* conv_heap = NULL;
-
+ FlushObserver* observer = trx->flush_observer;
float curr_progress = 0.0;
- ib_int64_t read_rows = 0;
- ib_int64_t table_total_rows = 0;
+ ib_uint64_t read_rows = 0;
+ ib_uint64_t table_total_rows = 0;
DBUG_ENTER("row_merge_read_clustered_index");
@@ -1524,13 +1865,18 @@ row_merge_read_clustered_index(
DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
#endif
- ut_ad(trx->mysql_thd != NULL);
- const char* path = thd_innodb_tmpdir(trx->mysql_thd);
-
/* Create and initialize memory for record buffers */
merge_buf = static_cast<row_merge_buf_t**>(
- mem_alloc(n_index * sizeof *merge_buf));
+ ut_malloc_nokey(n_index * sizeof *merge_buf));
+
+ row_merge_dup_t clust_dup = {index[0], table, col_map, 0};
+ dfield_t* prev_fields;
+ const ulint n_uniq = dict_index_get_n_unique(index[0]);
+
+ ut_ad(!skip_pk_sort || dict_index_is_clust(index[0]));
+ /* There is no previous tuple yet. */
+ prev_mtuple.fields = NULL;
for (ulint i = 0; i < n_index; i++) {
if (index[i]->type & DICT_FTS) {
@@ -1560,10 +1906,37 @@ row_merge_read_clustered_index(
fts_parallel_sort_event =
psort_info[0].psort_common->sort_event;
} else {
+ if (dict_index_is_spatial(index[i])) {
+ num_spatial++;
+ }
+
merge_buf[i] = row_merge_buf_create(index[i]);
}
}
+ if (num_spatial > 0) {
+ ulint count = 0;
+
+ sp_heap = mem_heap_create(512);
+
+ sp_tuples = static_cast<index_tuple_info_t**>(
+ ut_malloc_nokey(num_spatial
+ * sizeof(*sp_tuples)));
+
+ for (ulint i = 0; i < n_index; i++) {
+ if (dict_index_is_spatial(index[i])) {
+ sp_tuples[count]
+ = UT_NEW_NOKEY(
+ index_tuple_info_t(
+ sp_heap,
+ index[i]));
+ count++;
+ }
+ }
+
+ ut_ad(count == num_spatial);
+ }
+
mtr_start(&mtr);
/* Find the clustered index and create a persistent cursor
@@ -1581,7 +1954,7 @@ row_merge_read_clustered_index(
do not violate the added NOT NULL constraints. */
nonnull = static_cast<ulint*>(
- mem_alloc(dict_table_get_n_cols(new_table)
+ ut_malloc_nokey(dict_table_get_n_cols(new_table)
* sizeof *nonnull));
for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) {
@@ -1604,7 +1977,7 @@ row_merge_read_clustered_index(
}
if (!n_nonnull) {
- mem_free(nonnull);
+ ut_free(nonnull);
nonnull = NULL;
}
}
@@ -1616,6 +1989,14 @@ row_merge_read_clustered_index(
conv_heap = mem_heap_create(sizeof(mrec_buf_t));
}
+ if (skip_pk_sort) {
+ prev_fields = static_cast<dfield_t*>(
+ ut_malloc_nokey(n_uniq * sizeof *prev_fields));
+ mtuple_heap = mem_heap_create(sizeof(mrec_buf_t));
+ } else {
+ prev_fields = NULL;
+ }
+
/* Scan the clustered index. */
for (;;) {
const rec_t* rec;
@@ -1633,7 +2014,12 @@ row_merge_read_clustered_index(
page_cur_move_to_next(cur);
+ stage->n_pk_recs_inc();
+
if (page_cur_is_after_last(cur)) {
+
+ stage->inc();
+
if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
err = DB_INTERRUPTED;
trx->error_key_num = 0;
@@ -1647,6 +2033,7 @@ row_merge_read_clustered_index(
goto func_exit;
}
}
+
#ifdef DBUG_OFF
# define dbug_run_purge false
#else /* DBUG_OFF */
@@ -1656,6 +2043,22 @@ row_merge_read_clustered_index(
"ib_purge_on_create_index_page_switch",
dbug_run_purge = true;);
+ /* Insert the cached spatial index rows. */
+ bool mtr_committed = false;
+
+ err = row_merge_spatial_rows(
+ trx->id, sp_tuples, num_spatial,
+ row_heap, sp_heap, &pcur,
+ &mtr, &mtr_committed);
+
+ if (err != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ if (mtr_committed) {
+ goto scan_next;
+ }
+
if (dbug_run_purge
|| rw_lock_get_waiters(
dict_index_get_lock(clust_index))) {
@@ -1672,8 +2075,8 @@ row_merge_read_clustered_index(
/* Leaf pages must never be empty, unless
this is the only page in the index tree. */
ut_ad(btr_pcur_is_on_user_rec(&pcur)
- || buf_block_get_page_no(
- btr_pcur_get_block(&pcur))
+ || btr_pcur_get_block(
+ &pcur)->page.id.page_no()
== clust_index->page);
btr_pcur_store_position(&pcur, &mtr);
@@ -1693,7 +2096,7 @@ row_merge_read_clustered_index(
/* Give the waiters a chance to proceed. */
os_thread_yield();
-
+scan_next:
mtr_start(&mtr);
/* Restore position on the record, or its
predecessor if the record was purged
@@ -1708,9 +2111,7 @@ end_of_index:
row = NULL;
mtr_commit(&mtr);
mem_heap_free(row_heap);
- if (nonnull) {
- mem_free(nonnull);
- }
+ ut_free(nonnull);
goto write_buffers;
}
} else {
@@ -1726,9 +2127,10 @@ end_of_index:
block = page_cur_get_block(cur);
block = btr_block_get(
- buf_block_get_space(block),
- buf_block_get_zip_size(block),
- next_page_no, BTR_SEARCH_LEAF,
+ page_id_t(block->page.id.space(),
+ next_page_no),
+ block->page.size,
+ BTR_SEARCH_LEAF,
clust_index, &mtr);
btr_leaf_page_release(page_cur_get_block(cur),
@@ -1765,18 +2167,18 @@ end_of_index:
ONLINE_INDEX_COMPLETE state between the time
the DML thread has updated the clustered index
but has not yet accessed secondary index. */
- ut_ad(trx->read_view);
+ ut_ad(MVCC::is_view_active(trx->read_view));
- if (!read_view_sees_trx_id(
- trx->read_view,
+ if (!trx->read_view->changes_visible(
row_get_rec_trx_id(
- rec, clust_index, offsets))) {
+ rec, clust_index, offsets),
+ old_table->name)) {
rec_t* old_vers;
row_vers_build_for_consistent_read(
rec, &mtr, clust_index, &offsets,
trx->read_view, &row_heap,
- row_heap, &old_vers);
+ row_heap, &old_vers, NULL);
rec = old_vers;
@@ -1816,9 +2218,10 @@ end_of_index:
/* Build a row based on the clustered index. */
- row = row_build(ROW_COPY_POINTERS, clust_index,
- rec, offsets, new_table,
- add_cols, col_map, &ext, row_heap);
+ row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index,
+ rec, offsets, new_table,
+ add_cols, add_v, col_map, &ext,
+ row_heap);
ut_ad(row);
for (ulint i = 0; i < n_nonnull; i++) {
@@ -1897,25 +2300,50 @@ write_buffers:
/* Build all entries for all the indexes to be created
in a single scan of the clustered index. */
- for (ulint i = 0; i < n_index; i++) {
+ ulint s_idx_cnt = 0;
+ bool skip_sort = skip_pk_sort
+ && dict_index_is_clust(merge_buf[0]->index);
+
+ for (ulint i = 0; i < n_index; i++, skip_sort = false) {
row_merge_buf_t* buf = merge_buf[i];
merge_file_t* file = &files[i];
ulint rows_added = 0;
- bool exceed_page = false;
+
+ if (dict_index_is_spatial(buf->index)) {
+ if (!row) {
+ continue;
+ }
+
+ ut_ad(sp_tuples[s_idx_cnt]->get_index()
+ == buf->index);
+
+ /* If the geometry field is invalid, report
+ error. */
+ if (!row_geo_field_is_valid(row, buf->index)) {
+ err = DB_CANT_CREATE_GEOMETRY_OBJECT;
+ break;
+ }
+
+ sp_tuples[s_idx_cnt]->add(row, ext);
+ s_idx_cnt++;
+
+ continue;
+ }
if (UNIV_LIKELY
(row && (rows_added = row_merge_buf_add(
- buf, fts_index, old_table,
+ buf, fts_index, old_table, new_table,
psort_info, row, ext, &doc_id,
- conv_heap, &exceed_page)))) {
+ conv_heap, &err,
+ &v_heap)))) {
/* If we are creating FTS index,
a single row can generate more
records for tokenized word */
file->n_rec += rows_added;
- if (exceed_page) {
- err = DB_TOO_BIG_RECORD;
+ if (err != DB_SUCCESS) {
+ ut_ad(err == DB_TOO_BIG_RECORD);
break;
}
@@ -1925,8 +2353,10 @@ write_buffers:
if (buf->index->type & DICT_FTS) {
/* Check if error occurs in child thread */
- for (ulint j = 0; j < fts_sort_pll_degree; j++) {
- if (psort_info[j].error != DB_SUCCESS) {
+ for (ulint j = 0;
+ j < fts_sort_pll_degree; j++) {
+ if (psort_info[j].error
+ != DB_SUCCESS) {
err = psort_info[j].error;
trx->error_key_num = i;
break;
@@ -1938,9 +2368,39 @@ write_buffers:
}
}
+ if (skip_sort) {
+ ut_ad(buf->n_tuples > 0);
+ const mtuple_t* curr =
+ &buf->tuples[buf->n_tuples - 1];
+
+ ut_ad(i == 0);
+ ut_ad(dict_index_is_clust(merge_buf[0]->index));
+ /* Detect duplicates by comparing the
+ current record with previous record.
+ When temp file is not used, records
+ should be in sorted order. */
+ if (prev_mtuple.fields != NULL
+ && (row_mtuple_cmp(
+ &prev_mtuple, curr,
+ &clust_dup) == 0)) {
+
+ err = DB_DUPLICATE_KEY;
+ trx->error_key_num
+ = key_numbers[0];
+ goto func_exit;
+ }
+
+ prev_mtuple.fields = curr->fields;
+ }
+
continue;
}
+ if (err == DB_COMPUTE_VALUE_FAILED) {
+ trx->error_key_num = i;
+ goto func_exit;
+ }
+
if (buf->index->type & DICT_FTS) {
if (!row || !doc_id) {
continue;
@@ -1955,10 +2415,120 @@ write_buffers:
ut_ad(buf->n_tuples || row == NULL);
/* We have enough data tuples to form a block.
- Sort them and write to disk. */
+ Sort them and write to disk if temp file is used
+ or insert into index if temp file is not used. */
+ ut_ad(old_table == new_table
+ ? !dict_index_is_clust(buf->index)
+ : (i == 0) == dict_index_is_clust(buf->index));
+
+ /* We have enough data tuples to form a block.
+ Sort them (if !skip_sort) and write to disk. */
if (buf->n_tuples) {
- if (dict_index_is_unique(buf->index)) {
+ if (skip_sort) {
+ /* Temporary File is not used.
+ so insert sorted block to the index */
+ if (row != NULL) {
+ bool mtr_committed = false;
+
+ /* We have to do insert the
+ cached spatial index rows, since
+ after the mtr_commit, the cluster
+ index page could be updated, then
+ the data in cached rows become
+ invalid. */
+ err = row_merge_spatial_rows(
+ trx->id, sp_tuples,
+ num_spatial,
+ row_heap, sp_heap,
+ &pcur, &mtr,
+ &mtr_committed);
+
+ if (err != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ /* We are not at the end of
+ the scan yet. We must
+ mtr_commit() in order to be
+ able to call log_free_check()
+ in row_merge_insert_index_tuples().
+ Due to mtr_commit(), the
+ current row will be invalid, and
+ we must reread it on the next
+ loop iteration. */
+ if (!mtr_committed) {
+ btr_pcur_move_to_prev_on_page(
+ &pcur);
+ btr_pcur_store_position(
+ &pcur, &mtr);
+
+ mtr_commit(&mtr);
+ }
+ }
+
+ mem_heap_empty(mtuple_heap);
+ prev_mtuple.fields = prev_fields;
+
+ row_mtuple_create(
+ &buf->tuples[buf->n_tuples - 1],
+ &prev_mtuple, n_uniq,
+ mtuple_heap);
+
+ if (clust_btr_bulk == NULL) {
+ clust_btr_bulk = UT_NEW_NOKEY(
+ BtrBulk(index[i],
+ trx->id,
+ observer));
+
+ clust_btr_bulk->init();
+ } else {
+ clust_btr_bulk->latch();
+ }
+
+ err = row_merge_insert_index_tuples(
+ trx->id, index[i], old_table,
+ -1, NULL, buf, clust_btr_bulk,
+ table_total_rows,
+ curr_progress,
+ pct_cost,
+ NULL,
+ NULL,
+ new_table->space);
+
+ if (row == NULL) {
+ err = clust_btr_bulk->finish(
+ err);
+ UT_DELETE(clust_btr_bulk);
+ clust_btr_bulk = NULL;
+ } else {
+ /* Release latches for possible
+ log_free_chck in spatial index
+ build. */
+ clust_btr_bulk->release();
+ }
+
+ if (row != NULL) {
+ /* Restore the cursor on the
+ previous clustered index record,
+ and empty the buffer. The next
+ iteration of the outer loop will
+ advance the cursor and read the
+ next record (the one which we
+ had to ignore due to the buffer
+ overflow). */
+ mtr_start(&mtr);
+ btr_pcur_restore_position(
+ BTR_SEARCH_LEAF, &pcur,
+ &mtr);
+ buf = row_merge_buf_empty(buf);
+ /* Restart the outer loop on the
+ record. We did not insert it
+ into any index yet. */
+ ut_ad(i == 0);
+ break;
+ }
+ } else if (dict_index_is_unique(buf->index)) {
row_merge_dup_t dup = {
buf->index, table, col_map, 0};
@@ -1998,30 +2568,74 @@ write_buffers:
dict_index_get_lock(buf->index));
}
- if (buf->n_tuples > 0) {
+ /* Secondary index and clustered index which is
+ not in sorted order can use the temporary file.
+ Fulltext index should not use the temporary file. */
+ if (!skip_sort && !(buf->index->type & DICT_FTS)) {
+ /* In case we can have all rows in sort buffer,
+ we can insert directly into the index without
+ temporary file if clustered index does not uses
+ temporary file. */
+ if (row == NULL && file->fd == -1
+ && !clust_temp_file) {
+ DBUG_EXECUTE_IF(
+ "row_merge_write_failure",
+ err = DB_TEMP_FILE_WRITE_FAIL;
+ trx->error_key_num = i;
+ goto all_done;);
+
+ DBUG_EXECUTE_IF(
+ "row_merge_tmpfile_fail",
+ err = DB_OUT_OF_MEMORY;
+ trx->error_key_num = i;
+ goto all_done;);
+
+ BtrBulk btr_bulk(index[i], trx->id,
+ observer);
+ btr_bulk.init();
+
+ err = row_merge_insert_index_tuples(
+ trx->id, index[i], old_table,
+ -1, NULL, buf, &btr_bulk,
+ table_total_rows,
+ curr_progress,
+ pct_cost,
+ NULL,
+ NULL,
+ new_table->space);
+
+ err = btr_bulk.finish(err);
+ } else {
+ if (row_merge_file_create_if_needed(
+ file, tmpfd,
+ buf->n_tuples) < 0) {
+ err = DB_OUT_OF_MEMORY;
+ trx->error_key_num = i;
+ goto func_exit;
+ }
- if (row_merge_file_create_if_needed(
- file, tmpfd, buf->n_tuples, path) < 0) {
- err = DB_OUT_OF_MEMORY;
- trx->error_key_num = i;
- break;
- }
+ /* Ensure that duplicates in the
+ clustered index will be detected before
+ inserting secondary index records. */
+ if (dict_index_is_clust(buf->index)) {
+ clust_temp_file = true;
+ }
- ut_ad(file->n_rec > 0);
+ ut_ad(file->n_rec > 0);
- row_merge_buf_write(buf, file, block);
+ row_merge_buf_write(buf, file, block);
- if (!row_merge_write(file->fd, file->offset++,
- block, crypt_data, crypt_block,
- new_table->space)) {
- err = DB_TEMP_FILE_WRITE_FAILURE;
- trx->error_key_num = i;
- break;
+ if (!row_merge_write(file->fd, file->offset++, block,
+ crypt_data, crypt_block, new_table->space)) {
+ err = DB_TEMP_FILE_WRITE_FAIL;
+ trx->error_key_num = i;
+ break;
+ }
+
+ UNIV_MEM_INVALID(
+ &block[0], srv_sort_buf_size);
}
}
-
- UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
-
merge_buf[i] = row_merge_buf_empty(buf);
if (UNIV_LIKELY(row != NULL)) {
@@ -2032,16 +2646,15 @@ write_buffers:
if (UNIV_UNLIKELY
(!(rows_added = row_merge_buf_add(
buf, fts_index, old_table,
- psort_info, row, ext,
+ new_table, psort_info, row, ext,
&doc_id, conv_heap,
- &exceed_page)))) {
+ &err, &v_heap)))) {
/* An empty buffer should have enough
room for at least one record. */
ut_error;
}
- if (exceed_page) {
- err = DB_TOO_BIG_RECORD;
+ if (err != DB_SUCCESS) {
break;
}
@@ -2058,6 +2671,9 @@ write_buffers:
}
mem_heap_empty(row_heap);
+ if (v_heap) {
+ mem_heap_empty(v_heap);
+ }
/* Increment innodb_onlineddl_pct_progress status variable */
read_rows++;
@@ -2075,12 +2691,26 @@ func_exit:
mtr_commit(&mtr);
mem_heap_free(row_heap);
+ ut_free(nonnull);
- if (nonnull) {
- mem_free(nonnull);
+all_done:
+ if (clust_btr_bulk != NULL) {
+ ut_ad(err != DB_SUCCESS);
+ clust_btr_bulk->latch();
+ err = clust_btr_bulk->finish(
+ err);
+ UT_DELETE(clust_btr_bulk);
+ }
+
+ if (prev_fields != NULL) {
+ ut_free(prev_fields);
+ mem_heap_free(mtuple_heap);
+ }
+
+ if (v_heap) {
+ mem_heap_free(v_heap);
}
-all_done:
if (conv_heap != NULL) {
mem_heap_free(conv_heap);
}
@@ -2143,11 +2773,9 @@ wait_again:
} while (!all_exit && trial_count < max_trial_count);
if (!all_exit) {
- ut_ad(0);
- ib_logf(IB_LOG_LEVEL_FATAL,
- "Not all child sort threads exited"
- " when creating FTS index '%s'",
- fts_sort_idx->name);
+ ib::fatal() << "Not all child sort threads exited"
+ " when creating FTS index '"
+ << fts_sort_idx->name << "'";
}
}
@@ -2160,10 +2788,21 @@ wait_again:
row_fts_free_pll_merge_buf(psort_info);
- mem_free(merge_buf);
+ ut_free(merge_buf);
btr_pcur_close(&pcur);
+ if (sp_tuples != NULL) {
+ for (ulint i = 0; i < num_spatial; i++) {
+ UT_DELETE(sp_tuples[i]);
+ }
+ ut_free(sp_tuples);
+
+ if (sp_heap) {
+ mem_heap_free(sp_heap);
+ }
+ }
+
/* Update the next Doc ID we used. Table should be locked, so
no concurrent DML */
if (max_doc_id && err == DB_SUCCESS) {
@@ -2174,7 +2813,8 @@ wait_again:
if (err == DB_SUCCESS) {
fts_update_next_doc_id(
- 0, new_table, old_table->name, max_doc_id);
+ 0, new_table,
+ old_table->name.m_name, max_doc_id);
}
}
@@ -2184,10 +2824,10 @@ wait_again:
}
/** Write a record via buffer 2 and read the next record to buffer N.
-@param N number of the buffer (0 or 1)
-@param INDEX record descriptor
-@param AT_END statement to execute at end of input */
-#define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
+@param N number of the buffer (0 or 1)
+@param INDEX record descriptor
+@param AT_END statement to execute at end of input */
+#define ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END) \
do { \
b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \
&buf[2], b2, \
@@ -2215,23 +2855,40 @@ wait_again:
} \
} while (0)
-/*************************************************************//**
-Merge two blocks of records on disk and write a bigger block.
-@return DB_SUCCESS or error code */
-static __attribute__((nonnull(1,2,3,4,5,6), warn_unused_result))
+#ifdef HAVE_PSI_STAGE_INTERFACE
+#define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
+ do { \
+ if (stage != NULL) { \
+ stage->inc(); \
+ } \
+ ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END); \
+ } while (0)
+#else /* HAVE_PSI_STAGE_INTERFACE */
+#define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
+ ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END)
+#endif /* HAVE_PSI_STAGE_INTERFACE */
+
+/** Merge two blocks of records on disk and write a bigger block.
+@param[in] dup descriptor of index being created
+@param[in] file file containing index entries
+@param[in,out] block 3 buffers
+@param[in,out] foffs0 offset of first source list in the file
+@param[in,out] foffs1 offset of second source list in the file
+@param[in,out] of output file
+@param[in,out] stage performance schema accounting object, used by
+ALTER TABLE. If not NULL stage->inc() will be called for each record
+processed.
+@return DB_SUCCESS or error code */
+static __attribute__((warn_unused_result))
dberr_t
row_merge_blocks(
-/*=============*/
- const row_merge_dup_t* dup, /*!< in: descriptor of
- index being created */
- const merge_file_t* file, /*!< in: file containing
- index entries */
- row_merge_block_t* block, /*!< in/out: 3 buffers */
- ulint* foffs0, /*!< in/out: offset of first
- source list in the file */
- ulint* foffs1, /*!< in/out: offset of second
- source list in the file */
- merge_file_t* of, /*!< in/out: output file */
+ const row_merge_dup_t* dup,
+ const merge_file_t* file,
+ row_merge_block_t* block,
+ ulint* foffs0,
+ ulint* foffs1,
+ merge_file_t* of,
+ ut_stage_alter_t* stage,
fil_space_crypt_t* crypt_data,/*!< in: crypt data or NULL */
row_merge_block_t* crypt_block,/*!< in: in/out: crypted file
buffer */
@@ -2250,16 +2907,11 @@ row_merge_blocks(
ulint* offsets0;/* offsets of mrec0 */
ulint* offsets1;/* offsets of mrec1 */
-#ifdef UNIV_DEBUG
- if (row_merge_print_block) {
- fprintf(stderr,
- "row_merge_blocks fd=%d ofs=%lu + fd=%d ofs=%lu"
- " = fd=%d ofs=%lu\n",
- file->fd, (ulong) *foffs0,
- file->fd, (ulong) *foffs1,
- of->fd, (ulong) of->offset);
- }
-#endif /* UNIV_DEBUG */
+ DBUG_ENTER("row_merge_blocks");
+ DBUG_PRINT("ib_merge_sort",
+ ("fd=%d,%lu+%lu to fd=%d,%lu",
+ file->fd, ulong(*foffs0), ulong(*foffs1),
+ of->fd, ulong(of->offset)));
heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1);
@@ -2272,7 +2924,7 @@ row_merge_blocks(
crypt_data, crypt_block ? &crypt_block[srv_sort_buf_size] : NULL, space)) {
corrupt:
mem_heap_free(heap);
- return(DB_CORRUPTION);
+ DBUG_RETURN(DB_CORRUPTION);
}
b0 = &block[0];
@@ -2297,20 +2949,16 @@ corrupt:
}
while (mrec0 && mrec1) {
- switch (cmp_rec_rec_simple(
- mrec0, mrec1, offsets0, offsets1,
- dup->index, dup->table)) {
- case 0:
- mem_heap_free(heap);
- return(DB_DUPLICATE_KEY);
- case -1:
+ int cmp = cmp_rec_rec_simple(
+ mrec0, mrec1, offsets0, offsets1,
+ dup->index, dup->table);
+ if (cmp < 0) {
ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged);
- break;
- case 1:
+ } else if (cmp) {
ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged);
- break;
- default:
- ut_error;
+ } else {
+ mem_heap_free(heap);
+ DBUG_RETURN(DB_DUPLICATE_KEY);
}
}
@@ -2335,22 +2983,29 @@ done1:
b2 = row_merge_write_eof(&block[2 * srv_sort_buf_size],
b2, of->fd, &of->offset,
crypt_data, crypt_block ? &crypt_block[2 * srv_sort_buf_size] : NULL, space);
+ DBUG_RETURN(b2 ? DB_SUCCESS : DB_CORRUPTION);
- return(b2 ? DB_SUCCESS : DB_CORRUPTION);
}
-/*************************************************************//**
-Copy a block of index entries.
-@return TRUE on success, FALSE on failure */
-static __attribute__((nonnull(1,2,3,4,5), warn_unused_result))
+/** Copy a block of index entries.
+@param[in] index index being created
+@param[in] file input file
+@param[in,out] block 3 buffers
+@param[in,out] foffs0 input file offset
+@param[in,out] of output file
+@param[in,out] stage performance schema accounting object, used by
+ALTER TABLE. If not NULL stage->inc() will be called for each record
+processed.
+@return TRUE on success, FALSE on failure */
+static __attribute__((warn_unused_result))
ibool
row_merge_blocks_copy(
-/*==================*/
- const dict_index_t* index, /*!< in: index being created */
- const merge_file_t* file, /*!< in: input file */
- row_merge_block_t* block, /*!< in/out: 3 buffers */
- ulint* foffs0, /*!< in/out: input file offset */
- merge_file_t* of, /*!< in/out: output file */
+ const dict_index_t* index,
+ const merge_file_t* file,
+ row_merge_block_t* block,
+ ulint* foffs0,
+ merge_file_t* of,
+ ut_stage_alter_t* stage,
fil_space_crypt_t* crypt_data,/*!< in: table crypt data */
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
ulint space) /*!< in: space id */
@@ -2365,15 +3020,11 @@ row_merge_blocks_copy(
ulint* offsets0;/* offsets of mrec0 */
ulint* offsets1;/* dummy offsets */
-#ifdef UNIV_DEBUG
- if (row_merge_print_block) {
- fprintf(stderr,
- "row_merge_blocks_copy fd=%d ofs=%lu"
- " = fd=%d ofs=%lu\n",
- file->fd, (ulong) foffs0,
- of->fd, (ulong) of->offset);
- }
-#endif /* UNIV_DEBUG */
+ DBUG_ENTER("row_merge_blocks_copy");
+ DBUG_PRINT("ib_merge_sort",
+ ("fd=%d,%lu to fd=%d,%lu",
+ file->fd, ulong(foffs0),
+ of->fd, ulong(of->offset)));
heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
@@ -2384,7 +3035,7 @@ row_merge_blocks_copy(
crypt_data, crypt_block ? &crypt_block[0] : NULL, space)) {
corrupt:
mem_heap_free(heap);
- return(FALSE);
+ DBUG_RETURN(FALSE);
}
b0 = &block[0];
@@ -2414,32 +3065,37 @@ done0:
mem_heap_free(heap);
- return(row_merge_write_eof(&block[2 * srv_sort_buf_size],
+ DBUG_RETURN(row_merge_write_eof(&block[2 * srv_sort_buf_size],
b2, of->fd, &of->offset,
crypt_data,
crypt_block ? &crypt_block[2 * srv_sort_buf_size] : NULL, space)
- != NULL);
+ != NULL);
}
-/*************************************************************//**
-Merge disk files.
-@return DB_SUCCESS or error code */
-static __attribute__((nonnull(1,2,3,4,5,6,7)))
+/** Merge disk files.
+@param[in] trx transaction
+@param[in] dup descriptor of index being created
+@param[in,out] file file containing index entries
+@param[in,out] block 3 buffers
+@param[in,out] tmpfd temporary file handle
+@param[in,out] num_run Number of runs that remain to be merged
+@param[in,out] run_offset Array that contains the first offset number
+for each merge run
+@param[in,out] stage performance schema accounting object, used by
+ALTER TABLE. If not NULL stage->inc() will be called for each record
+processed.
+@return DB_SUCCESS or error code */
+static
dberr_t
row_merge(
-/*======*/
- trx_t* trx, /*!< in: transaction */
- const row_merge_dup_t* dup, /*!< in: descriptor of
- index being created */
- merge_file_t* file, /*!< in/out: file containing
- index entries */
- row_merge_block_t* block, /*!< in/out: 3 buffers */
- int* tmpfd, /*!< in/out: temporary file handle */
- ulint* num_run,/*!< in/out: Number of runs remain
- to be merged */
- ulint* run_offset, /*!< in/out: Array contains the
- first offset number for each merge
- run */
+ trx_t* trx,
+ const row_merge_dup_t* dup,
+ merge_file_t* file,
+ row_merge_block_t* block,
+ int* tmpfd,
+ ulint* num_run,
+ ulint* run_offset,
+ ut_stage_alter_t* stage,
fil_space_crypt_t* crypt_data,/*!< in: table crypt data */
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
ulint space) /*!< in: space id */
@@ -2489,7 +3145,7 @@ row_merge(
run_offset[n_run++] = of.offset;
error = row_merge_blocks(dup, file, block,
- &foffs0, &foffs1, &of,
+ &foffs0, &foffs1, &of, stage,
crypt_data, crypt_block, space);
if (error != DB_SUCCESS) {
@@ -2510,7 +3166,7 @@ row_merge(
run_offset[n_run++] = of.offset;
if (!row_merge_blocks_copy(dup->index, file, block,
- &foffs0, &of,
+ &foffs0, &of, stage,
crypt_data, crypt_block, space)) {
return(DB_CORRUPTION);
}
@@ -2528,7 +3184,7 @@ row_merge(
run_offset[n_run++] = of.offset;
if (!row_merge_blocks_copy(dup->index, file, block,
- &foffs1, &of,
+ &foffs1, &of, stage,
crypt_data, crypt_block, space)) {
return(DB_CORRUPTION);
}
@@ -2563,21 +3219,23 @@ row_merge(
return(DB_SUCCESS);
}
-/*************************************************************//**
-Merge disk files.
-@return DB_SUCCESS or error code */
-UNIV_INTERN
+/** Merge disk files.
+@param[in] trx transaction
+@param[in] dup descriptor of index being created
+@param[in,out] file file containing index entries
+@param[in,out] block 3 buffers
+@param[in,out] tmpfd temporary file handle
+@param[in,out] stage performance schema accounting object, used by
+ALTER TABLE. If not NULL, stage->begin_phase_sort() will be called initially
+and then stage->inc() will be called for each record processed.
+@return DB_SUCCESS or error code */
dberr_t
row_merge_sort(
-/*===========*/
- trx_t* trx, /*!< in: transaction */
- const row_merge_dup_t* dup, /*!< in: descriptor of
- index being created */
- merge_file_t* file, /*!< in/out: file containing
- index entries */
- row_merge_block_t* block, /*!< in/out: 3 buffers */
- int* tmpfd, /*!< in/out: temporary file handle
- */
+ trx_t* trx,
+ const row_merge_dup_t* dup,
+ merge_file_t* file,
+ row_merge_block_t* block,
+ int* tmpfd,
const bool update_progress,
/*!< in: update progress
status variable or not */
@@ -2587,7 +3245,8 @@ row_merge_sort(
const float pct_cost, /*!< in: current progress percent */
fil_space_crypt_t* crypt_data,/*!< in: table crypt data */
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space) /*!< in: space id */
+ ulint space, /*!< in: space id */
+ ut_stage_alter_t* stage)
{
const ulint half = file->offset / 2;
ulint num_runs;
@@ -2602,6 +3261,10 @@ row_merge_sort(
/* Record the number of merge runs we need to perform */
num_runs = file->offset;
+ if (stage != NULL) {
+ stage->begin_phase_sort(log2(num_runs));
+ }
+
/* Find the number N which 2^N is greater or equal than num_runs */
/* N is merge sort running count */
total_merge_sort_count = (ulint) ceil(my_log2f(num_runs));
@@ -2615,7 +3278,7 @@ row_merge_sort(
}
/* "run_offset" records each run's first offset number */
- run_offset = (ulint*) mem_alloc(file->offset * sizeof(ulint));
+ run_offset = (ulint*) ut_malloc_nokey(file->offset * sizeof(ulint));
/* This tells row_merge() where to start for the first round
of merge. */
@@ -2650,7 +3313,7 @@ row_merge_sort(
#endif /* UNIV_SOLARIS */
error = row_merge(trx, dup, file, block, tmpfd,
- &num_runs, run_offset,
+ &num_runs, run_offset, stage,
crypt_data, crypt_block, space);
if(update_progress) {
@@ -2669,7 +3332,7 @@ row_merge_sort(
UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
} while (num_runs > 1);
- mem_free(run_offset);
+ ut_free(run_offset);
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
@@ -2681,24 +3344,30 @@ row_merge_sort(
DBUG_RETURN(error);
}
-/*************************************************************//**
-Copy externally stored columns to the data tuple. */
-static MY_ATTRIBUTE((nonnull))
+/** Copy externally stored columns to the data tuple.
+@param[in] mrec record containing BLOB pointers,
+or NULL to use tuple instead
+@param[in] offsets offsets of mrec
+@param[in] zip_size compressed page size in bytes, or 0
+@param[in,out] tuple data tuple
+@param[in,out] heap memory heap */
+static
void
row_merge_copy_blobs(
-/*=================*/
- const mrec_t* mrec, /*!< in: merge record */
- const ulint* offsets,/*!< in: offsets of mrec */
- ulint zip_size,/*!< in: compressed page size in bytes, or 0 */
- dtuple_t* tuple, /*!< in/out: data tuple */
- mem_heap_t* heap) /*!< in/out: memory heap */
+ const mrec_t* mrec,
+ const ulint* offsets,
+ const page_size_t& page_size,
+ dtuple_t* tuple,
+ mem_heap_t* heap)
{
- ut_ad(rec_offs_any_extern(offsets));
+ ut_ad(mrec == NULL || rec_offs_any_extern(offsets));
for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
ulint len;
const void* data;
dfield_t* field = dtuple_get_nth_field(tuple, i);
+ ulint field_len;
+ const byte* field_data;
if (!dfield_is_ext(field)) {
continue;
@@ -2712,8 +3381,25 @@ row_merge_copy_blobs(
columns cannot possibly be freed between the time the
BLOB pointers are read (row_merge_read_clustered_index())
and dereferenced (below). */
- data = btr_rec_copy_externally_stored_field(
- mrec, offsets, zip_size, i, &len, heap, NULL);
+ if (mrec == NULL) {
+ field_data
+ = static_cast<byte*>(dfield_get_data(field));
+ field_len = dfield_get_len(field);
+
+ ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
+
+ ut_a(memcmp(field_data + field_len
+ - BTR_EXTERN_FIELD_REF_SIZE,
+ field_ref_zero,
+ BTR_EXTERN_FIELD_REF_SIZE));
+
+ data = btr_copy_externally_stored_field(
+ &len, field_data, page_size, field_len, heap);
+ } else {
+ data = btr_rec_copy_externally_stored_field(
+ mrec, offsets, page_size, i, &len, heap);
+ }
+
/* Because we have locked the table, any records
written by incomplete transactions must have been
rolled back already. There must not be any incomplete
@@ -2724,74 +3410,144 @@ row_merge_copy_blobs(
}
}
-/********************************************************************//**
-Read sorted file containing index data tuples and insert these data
-tuples to the index
-@return DB_SUCCESS or error number */
-static __attribute__((nonnull(2,3,5), warn_unused_result))
+/** Convert a merge record to a typed data tuple. Note that externally
+stored fields are not copied to heap.
+@param[in,out] index index on the table
+@param[in] mtuple merge record
+@param[in] heap memory heap from which memory needed is allocated
+@return index entry built. */
+static
+void
+row_merge_mtuple_to_dtuple(
+ dict_index_t* index,
+ dtuple_t* dtuple,
+ const mtuple_t* mtuple)
+{
+ ut_ad(!dict_index_is_ibuf(index));
+
+ memcpy(dtuple->fields, mtuple->fields,
+ dtuple->n_fields * sizeof *mtuple->fields);
+}
+
+/** Insert sorted data tuples to the index.
+@param[in] trx_id transaction identifier
+@param[in] index index to be inserted
+@param[in] old_table old table
+@param[in] fd file descriptor
+@param[in,out] block file buffer
+@param[in] row_buf row_buf the sorted data tuples,
+or NULL if fd, block will be used instead
+@param[in,out] btr_bulk btr bulk instance
+@param[in,out] stage performance schema accounting object, used by
+ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
+and then stage->inc() will be called for each record that is processed.
+@return DB_SUCCESS or error number */
+static __attribute__((warn_unused_result))
dberr_t
row_merge_insert_index_tuples(
-/*==========================*/
- trx_id_t trx_id, /*!< in: transaction identifier */
- dict_index_t* index, /*!< in: index */
- const dict_table_t* old_table,/*!< in: old table */
- int fd, /*!< in: file descriptor */
- row_merge_block_t* block, /*!< in/out: file buffer */
- const ib_int64_t table_total_rows, /*!< in: total rows of old table */
- const float pct_progress, /*!< in: total progress percent until now */
+ trx_id_t trx_id,
+ dict_index_t* index,
+ const dict_table_t* old_table,
+ int fd,
+ row_merge_block_t* block,
+ const row_merge_buf_t* row_buf,
+ BtrBulk* btr_bulk,
+ const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
+ const float pct_progress, /*!< in: total progress
+ percent until now */
const float pct_cost, /*!< in: current progress percent
*/
fil_space_crypt_t* crypt_data,/*!< in: table crypt data */
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space) /*!< in: space id */
+ ulint space, /*!< in: space id */
+ ut_stage_alter_t* stage)
{
const byte* b;
mem_heap_t* heap;
mem_heap_t* tuple_heap;
- mem_heap_t* ins_heap;
dberr_t error = DB_SUCCESS;
ulint foffs = 0;
ulint* offsets;
mrec_buf_t* buf;
- ib_int64_t inserted_rows = 0;
- float curr_progress;
+ ulint n_rows = 0;
+ dtuple_t* dtuple;
+ ib_uint64_t inserted_rows = 0;
+ float curr_progress = 0;
+
DBUG_ENTER("row_merge_insert_index_tuples");
ut_ad(!srv_read_only_mode);
ut_ad(!(index->type & DICT_FTS));
+ ut_ad(!dict_index_is_spatial(index));
ut_ad(trx_id);
+ if (stage != NULL) {
+ stage->begin_phase_insert();
+ }
+
tuple_heap = mem_heap_create(1000);
{
ulint i = 1 + REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields(index);
heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
- ins_heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
offsets = static_cast<ulint*>(
mem_heap_alloc(heap, i * sizeof *offsets));
offsets[0] = i;
offsets[1] = dict_index_get_n_fields(index);
}
- b = &block[0];
+ if (row_buf != NULL) {
+ ut_ad(fd == -1);
+ ut_ad(block == NULL);
+ DBUG_EXECUTE_IF("row_merge_read_failure",
+ error = DB_CORRUPTION;
+ goto err_exit;);
+ buf = NULL;
+ b = NULL;
+ dtuple = dtuple_create(
+ heap, dict_index_get_n_fields(index));
+ dtuple_set_n_fields_cmp(
+ dtuple, dict_index_get_n_unique_in_tree(index));
+ } else {
+ b = block;
+ dtuple = NULL;
if (!row_merge_read(fd, foffs, block,
crypt_data, crypt_block, space)) {
- error = DB_CORRUPTION;
- } else {
- buf = static_cast<mrec_buf_t*>(
- mem_heap_alloc(heap, sizeof *buf));
+ error = DB_CORRUPTION;
+ goto err_exit;
+ } else {
+ buf = static_cast<mrec_buf_t*>(
+ mem_heap_alloc(heap, sizeof *buf));
+ }
+ }
- for (;;) {
- const mrec_t* mrec;
- dtuple_t* dtuple;
- ulint n_ext;
- big_rec_t* big_rec;
- rec_t* rec;
- btr_cur_t cursor;
- mtr_t mtr;
+ for (;;) {
+ const mrec_t* mrec;
+ ulint n_ext;
+ mtr_t mtr;
+
+ if (stage != NULL) {
+ stage->inc();
+ }
+
+ if (row_buf != NULL) {
+ if (n_rows >= row_buf->n_tuples) {
+ break;
+ }
+
+ /* Convert merge tuple record from
+ row buffer to data tuple record */
+ row_merge_mtuple_to_dtuple(
+ index, dtuple, &row_buf->tuples[n_rows]);
+
+ n_ext = dtuple_get_n_ext(dtuple);
+ n_rows++;
+ /* BLOB pointers must be copied from dtuple */
+ mrec = NULL;
+ } else {
b = row_merge_read_rec(block, buf, b, index,
fd, &foffs, &mrec, offsets,
crypt_data, crypt_block, space);
@@ -2803,155 +3559,76 @@ row_merge_insert_index_tuples(
break;
}
- dict_index_t* old_index
- = dict_table_get_first_index(old_table);
-
- if (dict_index_is_clust(index)
- && dict_index_is_online_ddl(old_index)) {
- error = row_log_table_get_error(old_index);
- if (error != DB_SUCCESS) {
- break;
- }
- }
-
dtuple = row_rec_to_index_entry_low(
mrec, index, offsets, &n_ext, tuple_heap);
+ }
- if (!n_ext) {
- /* There are no externally stored columns. */
- } else {
- ut_ad(dict_index_is_clust(index));
- /* Off-page columns can be fetched safely
- when concurrent modifications to the table
- are disabled. (Purge can process delete-marked
- records, but row_merge_read_clustered_index()
- would have skipped them.)
-
- When concurrent modifications are enabled,
- row_merge_read_clustered_index() will
- only see rows from transactions that were
- committed before the ALTER TABLE started
- (REPEATABLE READ).
-
- Any modifications after the
- row_merge_read_clustered_index() scan
- will go through row_log_table_apply().
- Any modifications to off-page columns
- will be tracked by
- row_log_table_blob_alloc() and
- row_log_table_blob_free(). */
- row_merge_copy_blobs(
- mrec, offsets,
- dict_table_zip_size(old_table),
- dtuple, tuple_heap);
- }
-
- ut_ad(dtuple_validate(dtuple));
- log_free_check();
-
- mtr_start(&mtr);
- /* Insert after the last user record. */
- btr_cur_open_at_index_side(
- false, index, BTR_MODIFY_LEAF,
- &cursor, 0, &mtr);
- page_cur_position(
- page_rec_get_prev(btr_cur_get_rec(&cursor)),
- btr_cur_get_block(&cursor),
- btr_cur_get_page_cur(&cursor));
- cursor.flag = BTR_CUR_BINARY;
-#ifdef UNIV_DEBUG
- /* Check that the records are inserted in order. */
- rec = btr_cur_get_rec(&cursor);
-
- if (!page_rec_is_infimum(rec)) {
- ulint* rec_offsets = rec_get_offsets(
- rec, index, offsets,
- ULINT_UNDEFINED, &tuple_heap);
- ut_ad(cmp_dtuple_rec(dtuple, rec, rec_offsets)
- > 0);
- }
-#endif /* UNIV_DEBUG */
- ulint* ins_offsets = NULL;
-
- error = btr_cur_optimistic_insert(
- BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG
- | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG,
- &cursor, &ins_offsets, &ins_heap,
- dtuple, &rec, &big_rec, 0, NULL, &mtr);
-
- if (error == DB_FAIL) {
- ut_ad(!big_rec);
- mtr_commit(&mtr);
- mtr_start(&mtr);
- btr_cur_open_at_index_side(
- false, index, BTR_MODIFY_TREE,
- &cursor, 0, &mtr);
- page_cur_position(
- page_rec_get_prev(btr_cur_get_rec(
- &cursor)),
- btr_cur_get_block(&cursor),
- btr_cur_get_page_cur(&cursor));
+ dict_index_t* old_index
+ = dict_table_get_first_index(old_table);
- error = btr_cur_pessimistic_insert(
- BTR_NO_UNDO_LOG_FLAG
- | BTR_NO_LOCKING_FLAG
- | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG,
- &cursor, &ins_offsets, &ins_heap,
- dtuple, &rec, &big_rec, 0, NULL, &mtr);
+ if (dict_index_is_clust(index)
+ && dict_index_is_online_ddl(old_index)) {
+ error = row_log_table_get_error(old_index);
+ if (error != DB_SUCCESS) {
+ break;
}
+ }
- if (!dict_index_is_clust(index)) {
- page_update_max_trx_id(
- btr_cur_get_block(&cursor),
- btr_cur_get_page_zip(&cursor),
- trx_id, &mtr);
- }
+ if (!n_ext) {
+ /* There are no externally stored columns. */
+ } else {
+ ut_ad(dict_index_is_clust(index));
+ /* Off-page columns can be fetched safely
+ when concurrent modifications to the table
+ are disabled. (Purge can process delete-marked
+ records, but row_merge_read_clustered_index()
+ would have skipped them.)
+
+ When concurrent modifications are enabled,
+ row_merge_read_clustered_index() will
+ only see rows from transactions that were
+ committed before the ALTER TABLE started
+ (REPEATABLE READ).
+
+ Any modifications after the
+ row_merge_read_clustered_index() scan
+ will go through row_log_table_apply().
+ Any modifications to off-page columns
+ will be tracked by
+ row_log_table_blob_alloc() and
+ row_log_table_blob_free(). */
+ row_merge_copy_blobs(
+ mrec, offsets,
+ dict_table_page_size(old_table),
+ dtuple, tuple_heap);
+ }
- mtr_commit(&mtr);
+ ut_ad(dtuple_validate(dtuple));
- if (UNIV_LIKELY_NULL(big_rec)) {
- /* If the system crashes at this
- point, the clustered index record will
- contain a null BLOB pointer. This
- should not matter, because the copied
- table will be dropped on crash
- recovery anyway. */
-
- ut_ad(dict_index_is_clust(index));
- ut_ad(error == DB_SUCCESS);
- error = row_ins_index_entry_big_rec(
- dtuple, big_rec,
- ins_offsets, &ins_heap,
- index, NULL, __FILE__, __LINE__);
- dtuple_convert_back_big_rec(
- index, dtuple, big_rec);
- }
+ error = btr_bulk->insert(dtuple);
- if (error != DB_SUCCESS) {
- goto err_exit;
- }
+ if (error != DB_SUCCESS) {
+ goto err_exit;
+ }
- mem_heap_empty(tuple_heap);
- mem_heap_empty(ins_heap);
+ mem_heap_empty(tuple_heap);
- /* Increment innodb_onlineddl_pct_progress status variable */
- inserted_rows++;
- if(inserted_rows % 1000 == 0) {
- /* Update progress for each 1000 rows */
- curr_progress = (inserted_rows >= table_total_rows ||
- table_total_rows <= 0) ?
- pct_cost :
- ((pct_cost * inserted_rows) / table_total_rows);
+ /* Increment innodb_onlineddl_pct_progress status variable */
+ inserted_rows++;
+ if(inserted_rows % 1000 == 0) {
+ /* Update progress for each 1000 rows */
+ curr_progress = (inserted_rows >= table_total_rows ||
+ table_total_rows <= 0) ?
+ pct_cost :
+ ((pct_cost * inserted_rows) / table_total_rows);
- /* presenting 10.12% as 1012 integer */;
- onlineddl_pct_progress = (ulint) ((pct_progress + curr_progress) * 100);
- }
+ /* presenting 10.12% as 1012 integer */;
+ onlineddl_pct_progress = (pct_progress + curr_progress) * 100;
}
}
err_exit:
mem_heap_free(tuple_heap);
- mem_heap_free(ins_heap);
mem_heap_free(heap);
DBUG_RETURN(error);
@@ -2959,8 +3636,7 @@ err_exit:
/*********************************************************************//**
Sets an exclusive lock on a table, for the duration of creating indexes.
-@return error code or DB_SUCCESS */
-UNIV_INTERN
+@return error code or DB_SUCCESS */
dberr_t
row_merge_lock_table(
/*=================*/
@@ -2979,6 +3655,9 @@ row_merge_lock_table(
heap = mem_heap_create(512);
trx->op_info = "setting table lock for creating or dropping index";
+ trx->ddl = true;
+ /* Trx for DDL should not be forced to rollback for now */
+ trx->in_innodb |= TRX_FORCE_ROLLBACK_DISABLE;
node = sel_node_create(heap);
thr = pars_complete_graph_for_exec(node, trx, heap);
@@ -3064,9 +3743,7 @@ row_merge_drop_index_dict(
ut_ad(mutex_own(&dict_sys->mutex));
ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
-#ifdef UNIV_SYNC_DEBUG
- ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
-#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
info = pars_info_create();
pars_info_add_ull_literal(info, "indexid", index_id);
@@ -3079,9 +3756,8 @@ row_merge_drop_index_dict(
DB_TOO_MANY_CONCURRENT_TRXS. */
trx->error_state = DB_SUCCESS;
- ut_print_timestamp(stderr);
- fprintf(stderr, " InnoDB: Error: row_merge_drop_index_dict "
- "failed with error code: %u.\n", (unsigned) error);
+ ib::error() << "row_merge_drop_index_dict failed with error "
+ << error;
}
trx->op_info = "";
@@ -3091,7 +3767,6 @@ row_merge_drop_index_dict(
Drop indexes that were created before an error occurred.
The data dictionary must have been locked exclusively by the caller,
because the transaction will not be committed. */
-UNIV_INTERN
void
row_merge_drop_indexes_dict(
/*========================*/
@@ -3131,9 +3806,7 @@ row_merge_drop_indexes_dict(
ut_ad(mutex_own(&dict_sys->mutex));
ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
-#ifdef UNIV_SYNC_DEBUG
- ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
-#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
/* It is possible that table->n_ref_count > 1 when
locked=TRUE. In this case, all code that should have an open
@@ -3147,15 +3820,18 @@ row_merge_drop_indexes_dict(
trx->op_info = "dropping indexes";
error = que_eval_sql(info, sql, FALSE, trx);
- if (error != DB_SUCCESS) {
+ switch (error) {
+ case DB_SUCCESS:
+ break;
+ default:
/* Even though we ensure that DDL transactions are WAIT
and DEADLOCK free, we could encounter other errors e.g.,
DB_TOO_MANY_CONCURRENT_TRXS. */
+ ib::error() << "row_merge_drop_indexes_dict failed with error "
+ << error;
+ /* fall through */
+ case DB_TOO_MANY_CONCURRENT_TRXS:
trx->error_state = DB_SUCCESS;
-
- ut_print_timestamp(stderr);
- fprintf(stderr, " InnoDB: Error: row_merge_drop_indexes_dict "
- "failed with error code: %u.\n", (unsigned) error);
}
trx->op_info = "";
@@ -3165,7 +3841,6 @@ row_merge_drop_indexes_dict(
Drop indexes that were created before an error occurred.
The data dictionary must have been locked exclusively by the caller,
because the transaction will not be committed. */
-UNIV_INTERN
void
row_merge_drop_indexes(
/*===================*/
@@ -3181,16 +3856,14 @@ row_merge_drop_indexes(
ut_ad(mutex_own(&dict_sys->mutex));
ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
-#ifdef UNIV_SYNC_DEBUG
- ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
-#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
index = dict_table_get_first_index(table);
ut_ad(dict_index_is_clust(index));
ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
/* the caller should have an open handle to the table */
- ut_ad(table->n_ref_count >= 1);
+ ut_ad(table->get_ref_count() >= 1);
/* It is possible that table->n_ref_count > 1 when
locked=TRUE. In this case, all code that should have an open
@@ -3199,7 +3872,7 @@ row_merge_drop_indexes(
A concurrent purge will be prevented by dict_operation_lock. */
- if (!locked && table->n_ref_count > 1) {
+ if (!locked && table->get_ref_count() > 1) {
/* We will have to drop the indexes later, when the
table is guaranteed to be no longer in use. Mark the
indexes as incomplete and corrupted, so that other
@@ -3215,7 +3888,7 @@ row_merge_drop_indexes(
case ONLINE_INDEX_ABORTED_DROPPED:
continue;
case ONLINE_INDEX_COMPLETE:
- if (*index->name != TEMP_INDEX_PREFIX) {
+ if (index->is_committed()) {
/* Do nothing to already
published indexes. */
} else if (index->type & DICT_FTS) {
@@ -3268,7 +3941,7 @@ row_merge_drop_indexes(
continue;
case ONLINE_INDEX_CREATION:
rw_lock_x_lock(dict_index_get_lock(index));
- ut_ad(*index->name == TEMP_INDEX_PREFIX);
+ ut_ad(!index->is_committed());
row_log_abort_sec(index);
drop_aborted:
rw_lock_x_unlock(dict_index_get_lock(index));
@@ -3312,7 +3985,7 @@ row_merge_drop_indexes(
ut_ad(!dict_index_is_clust(index));
- if (*index->name == TEMP_INDEX_PREFIX) {
+ if (!index->is_committed()) {
/* If it is FTS index, drop from table->fts
and also drop its auxiliary tables */
if (index->type & DICT_FTS) {
@@ -3351,7 +4024,6 @@ row_merge_drop_indexes(
/*********************************************************************//**
Drop all partially created indexes during crash recovery. */
-UNIV_INTERN
void
row_merge_drop_temp_indexes(void)
/*=============================*/
@@ -3403,9 +4075,8 @@ row_merge_drop_temp_indexes(void)
DB_TOO_MANY_CONCURRENT_TRXS. */
trx->error_state = DB_SUCCESS;
- ut_print_timestamp(stderr);
- fprintf(stderr, " InnoDB: Error: row_merge_drop_temp_indexes "
- "failed with error code: %u.\n", (unsigned) error);
+ ib::error() << "row_merge_drop_temp_indexes failed with error"
+ << error;
}
trx_commit_for_mysql(trx);
@@ -3418,10 +4089,8 @@ row_merge_drop_temp_indexes(void)
UNIV_PFS_IO defined, register the file descriptor with Performance Schema.
@param[in] path location for creating temporary merge files.
@return File descriptor */
-UNIV_INTERN
int
-row_merge_file_create_low(
- const char* path)
+row_merge_file_create_low(void)
{
int fd;
#ifdef UNIV_PFS_IO
@@ -3430,20 +4099,19 @@ row_merge_file_create_low(
performance schema */
struct PSI_file_locker* locker = NULL;
PSI_file_locker_state state;
- register_pfs_file_open_begin(&state, locker, innodb_file_temp_key,
+ register_pfs_file_open_begin(&state, locker, innodb_temp_file_key,
PSI_FILE_OPEN,
"Innodb Merge Temp File",
__FILE__, __LINE__);
#endif
- fd = innobase_mysql_tmpfile(path);
+ fd = innobase_mysql_tmpfile();
#ifdef UNIV_PFS_IO
register_pfs_file_open_end(locker, fd);
#endif
if (fd < 0) {
- ib_logf(IB_LOG_LEVEL_ERROR,
- "Cannot create temporary merge file");
- return (-1);
+ ib::error() << "Cannot create temporary merge file";
+ return(-1);
}
return(fd);
}
@@ -3451,15 +4119,12 @@ row_merge_file_create_low(
/** Create a merge file in the given location.
@param[out] merge_file merge file structure
-@param[in] path location for creating temporary file
@return file descriptor, or -1 on failure */
-UNIV_INTERN
int
row_merge_file_create(
- merge_file_t* merge_file,
- const char* path)
+ merge_file_t* merge_file)
{
- merge_file->fd = row_merge_file_create_low(path);
+ merge_file->fd = row_merge_file_create_low();
merge_file->offset = 0;
merge_file->n_rec = 0;
@@ -3475,7 +4140,6 @@ row_merge_file_create(
/*********************************************************************//**
Destroy a merge file. And de-register the file from Performance Schema
if UNIV_PFS_IO is defined. */
-UNIV_INTERN
void
row_merge_file_destroy_low(
/*=======================*/
@@ -3497,7 +4161,6 @@ row_merge_file_destroy_low(
}
/*********************************************************************//**
Destroy a merge file. */
-UNIV_INTERN
void
row_merge_file_destroy(
/*===================*/
@@ -3515,8 +4178,7 @@ row_merge_file_destroy(
Rename an index in the dictionary that was created. The data
dictionary must have been locked exclusively by the caller, because
the transaction will not be committed.
-@return DB_SUCCESS if all OK */
-UNIV_INTERN
+@return DB_SUCCESS if all OK */
dberr_t
row_merge_rename_index_to_add(
/*==========================*/
@@ -3554,10 +4216,8 @@ row_merge_rename_index_to_add(
DB_TOO_MANY_CONCURRENT_TRXS. */
trx->error_state = DB_SUCCESS;
- ut_print_timestamp(stderr);
- fprintf(stderr,
- " InnoDB: Error: row_merge_rename_index_to_add "
- "failed with error code: %u.\n", (unsigned) err);
+ ib::error() << "row_merge_rename_index_to_add failed with"
+ " error " << err;
}
trx->op_info = "";
@@ -3569,8 +4229,7 @@ row_merge_rename_index_to_add(
Rename an index in the dictionary that is to be dropped. The data
dictionary must have been locked exclusively by the caller, because
the transaction will not be committed.
-@return DB_SUCCESS if all OK */
-UNIV_INTERN
+@return DB_SUCCESS if all OK */
dberr_t
row_merge_rename_index_to_drop(
/*===========================*/
@@ -3611,10 +4270,8 @@ row_merge_rename_index_to_drop(
DB_TOO_MANY_CONCURRENT_TRXS. */
trx->error_state = DB_SUCCESS;
- ut_print_timestamp(stderr);
- fprintf(stderr,
- " InnoDB: Error: row_merge_rename_index_to_drop "
- "failed with error code: %u.\n", (unsigned) err);
+ ib::error() << "row_merge_rename_index_to_drop failed with"
+ " error " << err;
}
trx->op_info = "";
@@ -3626,8 +4283,7 @@ row_merge_rename_index_to_drop(
Provide a new pathname for a table that is being renamed if it belongs to
a file-per-table tablespace. The caller is responsible for freeing the
memory allocated for the return value.
-@return new pathname of tablespace file, or NULL if space = 0 */
-UNIV_INTERN
+@return new pathname of tablespace file, or NULL if space = 0 */
char*
row_make_new_pathname(
/*==================*/
@@ -3637,14 +4293,14 @@ row_make_new_pathname(
char* new_path;
char* old_path;
- ut_ad(table->space != TRX_SYS_SPACE);
+ ut_ad(!is_system_tablespace(table->space));
old_path = fil_space_get_first_path(table->space);
ut_a(old_path);
new_path = os_file_make_new_pathname(old_path, new_name);
- mem_free(old_path);
+ ut_free(old_path);
return(new_path);
}
@@ -3653,8 +4309,7 @@ row_make_new_pathname(
Rename the tables in the data dictionary. The data dictionary must
have been locked exclusively by the caller, because the transaction
will not be committed.
-@return error code or DB_SUCCESS */
-UNIV_INTERN
+@return error code or DB_SUCCESS */
dberr_t
row_merge_rename_tables_dict(
/*=========================*/
@@ -3682,8 +4337,8 @@ row_merge_rename_tables_dict(
info = pars_info_create();
- pars_info_add_str_literal(info, "new_name", new_table->name);
- pars_info_add_str_literal(info, "old_name", old_table->name);
+ pars_info_add_str_literal(info, "new_name", new_table->name.m_name);
+ pars_info_add_str_literal(info, "old_name", old_table->name.m_name);
pars_info_add_str_literal(info, "tmp_name", tmp_name);
err = que_eval_sql(info,
@@ -3695,10 +4350,11 @@ row_merge_rename_tables_dict(
" WHERE NAME = :new_name;\n"
"END;\n", FALSE, trx);
- /* Update SYS_TABLESPACES and SYS_DATAFILES if the old
- table is in a non-system tablespace where space > 0. */
+ /* Update SYS_TABLESPACES and SYS_DATAFILES if the old table being
+ renamed is a single-table tablespace, which must be implicitly
+ renamed along with the table. */
if (err == DB_SUCCESS
- && old_table->space != TRX_SYS_SPACE
+ && dict_table_is_file_per_table(old_table)
&& !old_table->ibd_file_missing) {
/* Make pathname to update SYS_DATAFILES. */
char* tmp_path = row_make_new_pathname(old_table, tmp_name);
@@ -3721,19 +4377,22 @@ row_merge_rename_tables_dict(
" WHERE SPACE = :old_space;\n"
"END;\n", FALSE, trx);
- mem_free(tmp_path);
+ ut_free(tmp_path);
}
- /* Update SYS_TABLESPACES and SYS_DATAFILES if the new
- table is in a non-system tablespace where space > 0. */
- if (err == DB_SUCCESS && new_table->space != TRX_SYS_SPACE) {
+ /* Update SYS_TABLESPACES and SYS_DATAFILES if the new table being
+ renamed is a single-table tablespace, which must be implicitly
+ renamed along with the table. */
+ if (err == DB_SUCCESS
+ && dict_table_is_file_per_table(new_table)) {
/* Make pathname to update SYS_DATAFILES. */
char* old_path = row_make_new_pathname(
- new_table, old_table->name);
+ new_table, old_table->name.m_name);
info = pars_info_create();
- pars_info_add_str_literal(info, "old_name", old_table->name);
+ pars_info_add_str_literal(info, "old_name",
+ old_table->name.m_name);
pars_info_add_str_literal(info, "old_path", old_path);
pars_info_add_int4_literal(info, "new_space",
(lint) new_table->space);
@@ -3749,7 +4408,7 @@ row_merge_rename_tables_dict(
" WHERE SPACE = :new_space;\n"
"END;\n", FALSE, trx);
- mem_free(old_path);
+ ut_free(old_path);
}
if (err == DB_SUCCESS && dict_table_is_discarded(new_table)) {
@@ -3762,22 +4421,27 @@ row_merge_rename_tables_dict(
return(err);
}
-/*********************************************************************//**
-Create and execute a query graph for creating an index.
-@return DB_SUCCESS or error code */
+/** Create and execute a query graph for creating an index.
+@param[in,out] trx trx
+@param[in,out] table table
+@param[in,out] index index
+@param[in] add_v new virtual columns added along with add index call
+@return DB_SUCCESS or error code */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
row_merge_create_index_graph(
-/*=========================*/
- trx_t* trx, /*!< in: trx */
- dict_table_t* table, /*!< in: table */
- dict_index_t* index) /*!< in: index */
+ trx_t* trx,
+ dict_table_t* table,
+ dict_index_t* index,
+ const dict_add_v_col_t* add_v)
{
ind_node_t* node; /*!< Index creation node */
mem_heap_t* heap; /*!< Memory heap */
que_thr_t* thr; /*!< Query thread */
dberr_t err;
+ DBUG_ENTER("row_merge_create_index_graph");
+
ut_ad(trx);
ut_ad(table);
ut_ad(index);
@@ -3785,7 +4449,7 @@ row_merge_create_index_graph(
heap = mem_heap_create(512);
index->table = table;
- node = ind_create_graph_create(index, heap, false);
+ node = ind_create_graph_create(index, heap, add_v);
thr = pars_complete_graph_for_exec(node, trx, heap);
ut_a(thr == que_fork_start_command(
@@ -3797,41 +4461,48 @@ row_merge_create_index_graph(
que_graph_free((que_t*) que_node_get_parent(thr));
- return(err);
+ DBUG_RETURN(err);
}
-/*********************************************************************//**
-Create the index and load in to the dictionary.
-@return index, or NULL on error */
-UNIV_INTERN
+/** Create the index and load in to the dictionary.
+@param[in,out] trx trx (sets error_state)
+@param[in,out] table the index is on this table
+@param[in] index_def the index definition
+@param[in] add_v new virtual columns added along with add
+ index call
+@param[in] col_names column names if columns are renamed
+ or NULL
+@return index, or NULL on error */
dict_index_t*
row_merge_create_index(
-/*===================*/
- trx_t* trx, /*!< in/out: trx (sets error_state) */
- dict_table_t* table, /*!< in: the index is on this table */
+ trx_t* trx,
+ dict_table_t* table,
const index_def_t* index_def,
- /*!< in: the index definition */
+ const dict_add_v_col_t* add_v,
const char** col_names)
- /*! in: column names if columns are
- renamed or NULL */
{
dict_index_t* index;
dberr_t err;
ulint n_fields = index_def->n_fields;
ulint i;
+ DBUG_ENTER("row_merge_create_index");
+
ut_ad(!srv_read_only_mode);
/* Create the index prototype, using the passed in def, this is not
a persistent operation. We pass 0 as the space id, and determine at
a lower level the space id where to store the table. */
- index = dict_mem_index_create(table->name, index_def->name,
+ index = dict_mem_index_create(table->name.m_name, index_def->name,
0, index_def->ind_type, n_fields);
ut_a(index);
+ index->set_committed(index_def->rebuild);
+
for (i = 0; i < n_fields; i++) {
+ const char* name;
index_field_t* ifield = &index_def->fields[i];
const char * col_name;
@@ -3852,21 +4523,37 @@ row_merge_create_index(
dict_table_get_col_name(table, ifield->col_no);
}
- dict_mem_index_add_field(
- index,
- col_name,
- ifield->prefix_len);
+ if (ifield->is_v_col) {
+ if (ifield->col_no >= table->n_v_def) {
+ ut_ad(ifield->col_no < table->n_v_def
+ + add_v->n_v_col);
+ ut_ad(ifield->col_no >= table->n_v_def);
+ name = add_v->v_col_name[
+ ifield->col_no - table->n_v_def];
+ } else {
+ name = dict_table_get_v_col_name(
+ table, ifield->col_no);
+ }
+ } else {
+ name = dict_table_get_col_name(table, ifield->col_no);
+ }
+
+ dict_mem_index_add_field(index, name, ifield->prefix_len);
}
/* Add the index to SYS_INDEXES, using the index prototype. */
- err = row_merge_create_index_graph(trx, table, index);
+ err = row_merge_create_index_graph(trx, table, index, add_v);
if (err == DB_SUCCESS) {
- index = dict_table_get_index_on_name(table, index_def->name);
+ index = dict_table_get_index_on_name(table, index_def->name,
+ index_def->rebuild);
ut_a(index);
+ index->parser = index_def->parser;
+ index->is_ngram = index_def->is_ngram;
+
/* Note the id of the transaction that created this
index, we use it to restrict readers from accessing
this index, to ensure read consistency. */
@@ -3875,12 +4562,11 @@ row_merge_create_index(
index = NULL;
}
- return(index);
+ DBUG_RETURN(index);
}
/*********************************************************************//**
Check if a transaction can use an index. */
-UNIV_INTERN
ibool
row_merge_is_index_usable(
/*======================*/
@@ -3895,8 +4581,11 @@ row_merge_is_index_usable(
return(!dict_index_is_corrupted(index)
&& (dict_table_is_temporary(index->table)
- || !trx->read_view
- || read_view_sees_trx_id(trx->read_view, index->trx_id)));
+ || index->trx_id == 0
+ || !MVCC::is_view_active(trx->read_view)
+ || trx->read_view->changes_visible(
+ index->trx_id,
+ index->table->name)));
}
/*********************************************************************//**
@@ -3904,8 +4593,7 @@ Drop a table. The caller must have ensured that the background stats
thread is not processing the table. This can be done by calling
dict_stats_wait_bg_to_stop_using_table() after locking the dictionary and
before calling this function.
-@return DB_SUCCESS or error code */
-UNIV_INTERN
+@return DB_SUCCESS or error code */
dberr_t
row_merge_drop_table(
/*=================*/
@@ -3915,49 +4603,84 @@ row_merge_drop_table(
ut_ad(!srv_read_only_mode);
/* There must be no open transactions on the table. */
- ut_a(table->n_ref_count == 0);
+ ut_a(table->get_ref_count() == 0);
- return(row_drop_table_for_mysql(table->name, trx, false, false, false));
+ return(row_drop_table_for_mysql(table->name.m_name,
+ trx, false, false));
}
-/*********************************************************************//**
-Build indexes on a table by reading a clustered index,
-creating a temporary file containing index entries, merge sorting
-these index entries and inserting sorted index entries to indexes.
-@return DB_SUCCESS or error code */
-UNIV_INTERN
+/** Write an MLOG_INDEX_LOAD record to indicate in the redo-log
+that redo-logging of individual index pages was disabled, and
+the flushing of such pages to the data files was completed.
+@param[in] index an index tree on which redo logging was disabled */
+static
+void
+row_merge_write_redo(
+ const dict_index_t* index)
+{
+ mtr_t mtr;
+ byte* log_ptr;
+
+ ut_ad(!dict_table_is_temporary(index->table));
+ mtr.start();
+ log_ptr = mlog_open(&mtr, 11 + 8);
+ log_ptr = mlog_write_initial_log_record_low(
+ MLOG_INDEX_LOAD,
+ index->space, index->page, log_ptr, &mtr);
+ mach_write_to_8(log_ptr, index->id);
+ mlog_close(&mtr, log_ptr + 8);
+ mtr.commit();
+}
+
+/** Build indexes on a table by reading a clustered index, creating a temporary
+file containing index entries, merge sorting these index entries and inserting
+sorted index entries to indexes.
+@param[in] trx transaction
+@param[in] old_table table where rows are read from
+@param[in] new_table table where indexes are created; identical to
+old_table unless creating a PRIMARY KEY
+@param[in] online true if creating indexes online
+@param[in] indexes indexes to be created
+@param[in] key_numbers MySQL key numbers
+@param[in] n_indexes size of indexes[]
+@param[in,out] table MySQL table, for reporting erroneous key value
+if applicable
+@param[in] add_cols default values of added columns, or NULL
+@param[in] col_map mapping of old column numbers to new ones, or
+NULL if old_table == new_table
+@param[in] add_autoinc number of added AUTO_INCREMENT columns, or
+ULINT_UNDEFINED if none is added
+@param[in,out] sequence autoinc sequence
+@param[in] skip_pk_sort whether the new PRIMARY KEY will follow
+existing order
+@param[in,out] stage performance schema accounting object, used by
+ALTER TABLE. stage->begin_phase_read_pk() will be called at the beginning of
+this function and it will be passed to other functions for further accounting.
+@param[in] add_v new virtual columns added along with indexes
+@return DB_SUCCESS or error code */
dberr_t
row_merge_build_indexes(
-/*====================*/
- trx_t* trx, /*!< in: transaction */
- dict_table_t* old_table, /*!< in: table where rows are
- read from */
- dict_table_t* new_table, /*!< in: table where indexes are
- created; identical to old_table
- unless creating a PRIMARY KEY */
- bool online, /*!< in: true if creating indexes
- online */
- dict_index_t** indexes, /*!< in: indexes to be created */
- const ulint* key_numbers, /*!< in: MySQL key numbers */
- ulint n_indexes, /*!< in: size of indexes[] */
- struct TABLE* table, /*!< in/out: MySQL table, for
- reporting erroneous key value
- if applicable */
- const dtuple_t* add_cols, /*!< in: default values of
- added columns, or NULL */
- const ulint* col_map, /*!< in: mapping of old column
- numbers to new ones, or NULL
- if old_table == new_table */
- ulint add_autoinc, /*!< in: number of added
- AUTO_INCREMENT column, or
- ULINT_UNDEFINED if none is added */
- ib_sequence_t& sequence) /*!< in: autoinc instance if
- add_autoinc != ULINT_UNDEFINED */
+ trx_t* trx,
+ dict_table_t* old_table,
+ dict_table_t* new_table,
+ bool online,
+ dict_index_t** indexes,
+ const ulint* key_numbers,
+ ulint n_indexes,
+ struct TABLE* table,
+ const dtuple_t* add_cols,
+ const ulint* col_map,
+ ulint add_autoinc,
+ ib_sequence_t& sequence,
+ bool skip_pk_sort,
+ ut_stage_alter_t* stage,
+ const dict_add_v_col_t* add_v)
{
merge_file_t* merge_files;
row_merge_block_t* block;
+ ut_new_pfx_t block_pfx;
+ ut_new_pfx_t crypt_pfx;
row_merge_block_t* crypt_block;
- ulint block_size;
ulint i;
ulint j;
dberr_t error;
@@ -3965,7 +4688,7 @@ row_merge_build_indexes(
dict_index_t* fts_sort_idx = NULL;
fts_psort_t* psort_info = NULL;
fts_psort_t* merge_info = NULL;
- ib_int64_t sig_count = 0;
+ int64_t sig_count = 0;
bool fts_psort_initiated = false;
fil_space_crypt_t * crypt_data = NULL;
@@ -3981,12 +4704,18 @@ row_merge_build_indexes(
ut_ad((old_table == new_table) == !col_map);
ut_ad(!add_cols || col_map);
+ stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table
+ ? n_indexes - 1
+ : n_indexes);
+
/* Allocate memory for merge file data structure and initialize
fields */
- block_size = 3 * srv_sort_buf_size;
- block = static_cast<row_merge_block_t*>(
- os_mem_alloc_large(&block_size));
+ ut_allocator<row_merge_block_t> alloc(mem_key_row_merge_sort);
+
+ /* This will allocate "3 * srv_sort_buf_size" elements of type
+ row_merge_block_t. The latter is defined as byte. */
+ block = alloc.allocate_large(3 * srv_sort_buf_size, &block_pfx);
if (block == NULL) {
DBUG_RETURN(DB_OUT_OF_MEMORY);
@@ -4003,7 +4732,7 @@ row_merge_build_indexes(
crypt_data && crypt_data->encryption == FIL_SPACE_ENCRYPTION_DEFAULT)) {
crypt_block = static_cast<row_merge_block_t*>(
- os_mem_alloc_large(&block_size));
+ alloc.allocate_large(3 * srv_sort_buf_size, &crypt_pfx));
if (crypt_block == NULL) {
DBUG_RETURN(DB_OUT_OF_MEMORY);
@@ -4013,10 +4742,36 @@ row_merge_build_indexes(
crypt_data = NULL;
}
- trx_start_if_not_started_xa(trx);
+ trx_start_if_not_started_xa(trx, true);
+
+ /* Check if we need a flush observer to flush dirty pages.
+ Since we disable redo logging in bulk load, so we should flush
+ dirty pages before online log apply, because online log apply enables
+ redo logging(we can do further optimization here).
+ 1. online add index: flush dirty pages right before row_log_apply().
+ 2. table rebuild: flush dirty pages before row_log_table_apply().
+
+ we use bulk load to create all types of indexes except spatial index,
+ for which redo logging is enabled. If we create only spatial indexes,
+ we don't need to flush dirty pages at all. */
+ bool need_flush_observer = (old_table != new_table);
+
+ for (i = 0; i < n_indexes; i++) {
+ if (!dict_index_is_spatial(indexes[i])) {
+ need_flush_observer = true;
+ }
+ }
+
+ FlushObserver* flush_observer = NULL;
+ if (need_flush_observer) {
+ flush_observer = UT_NEW_NOKEY(
+ FlushObserver(new_table->space, trx, stage));
+
+ trx_set_flush_observer(trx, flush_observer);
+ }
merge_files = static_cast<merge_file_t*>(
- mem_alloc(n_indexes * sizeof *merge_files));
+ ut_malloc_nokey(n_indexes * sizeof *merge_files));
/* Initialize all the merge file descriptors, so that we
don't call row_merge_file_destroy() on uninitialized
@@ -4030,7 +4785,6 @@ row_merge_build_indexes(
total_dynamic_cost = COST_BUILD_INDEX_DYNAMIC * n_indexes;
for (i = 0; i < n_indexes; i++) {
-
if (indexes[i]->type & DICT_FTS) {
ibool opt_doc_id_size = FALSE;
@@ -4041,18 +4795,24 @@ row_merge_build_indexes(
fts_sort_idx = row_merge_create_fts_sort_index(
indexes[i], old_table, &opt_doc_id_size);
- row_merge_dup_t* dup = static_cast<row_merge_dup_t*>(
- ut_malloc(sizeof *dup));
+ row_merge_dup_t* dup
+ = static_cast<row_merge_dup_t*>(
+ ut_malloc_nokey(sizeof *dup));
dup->index = fts_sort_idx;
dup->table = table;
dup->col_map = col_map;
dup->n_dup = 0;
- row_fts_psort_info_init(
- trx, dup, new_table, opt_doc_id_size,
- &psort_info, &merge_info);
+ /* This can fail e.g. if temporal files can't be
+ created */
+ if (!row_fts_psort_info_init(
+ trx, dup, new_table, opt_doc_id_size,
+ &psort_info, &merge_info)) {
+ error = DB_CORRUPTION;
+ goto func_exit;
+ }
- /* "We need to ensure that we free the resources
+ /* We need to ensure that we free the resources
allocated */
fts_psort_initiated = true;
}
@@ -4081,13 +4841,14 @@ row_merge_build_indexes(
/* Read clustered index of the table and create files for
secondary index entries for merge sort */
-
error = row_merge_read_clustered_index(
trx, table, old_table, new_table, online, indexes,
fts_sort_idx, psort_info, merge_files, key_numbers,
- n_indexes, add_cols, col_map,
- add_autoinc, sequence, block, &tmpfd,
- pct_cost, crypt_data, crypt_block);
+ n_indexes, add_cols, add_v, col_map, add_autoinc,
+ sequence, block, skip_pk_sort, &tmpfd, stage,
+ pct_cost, crypt_data, crypt_block);
+
+ stage->end_phase_read_pk();
pct_progress += pct_cost;
@@ -4114,6 +4875,10 @@ row_merge_build_indexes(
for (i = 0; i < n_indexes; i++) {
dict_index_t* sort_idx = indexes[i];
+ if (dict_index_is_spatial(sort_idx)) {
+ continue;
+ }
+
if (indexes[i]->type & DICT_FTS) {
os_event_t fts_parallel_merge_event;
@@ -4163,11 +4928,10 @@ wait_again:
}
if (!all_exit) {
- ib_logf(IB_LOG_LEVEL_ERROR,
- "Not all child merge threads"
- " exited when creating FTS"
- " index '%s'",
- indexes[i]->name);
+ ib::error() << "Not all child merge"
+ " threads exited when creating"
+ " FTS index '"
+ << indexes[i]->name << "'";
}
} else {
/* This cannot report duplicates; an
@@ -4180,7 +4944,7 @@ wait_again:
#ifdef FTS_INTERNAL_DIAG_PRINT
DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n");
#endif
- } else if (merge_files[i].fd != -1) {
+ } else if (merge_files[i].fd >= 0) {
char buf[3 * NAME_LEN];
char *bufend;
row_merge_dup_t dup = {
@@ -4194,8 +4958,7 @@ wait_again:
bufend = innobase_convert_name(buf, sizeof buf,
indexes[i]->name, strlen(indexes[i]->name),
- trx ? trx->mysql_thd : NULL,
- FALSE);
+ trx ? trx->mysql_thd : NULL);
buf[bufend - buf]='\0';
@@ -4207,7 +4970,7 @@ wait_again:
trx, &dup, &merge_files[i],
block, &tmpfd, true,
pct_progress, pct_cost,
- crypt_data, crypt_block, new_table->space);
+ crypt_data, crypt_block, new_table->space, stage);
pct_progress += pct_cost;
@@ -4220,6 +4983,10 @@ wait_again:
os_thread_sleep(20000000);); /* 20 sec */
if (error == DB_SUCCESS) {
+ BtrBulk btr_bulk(sort_idx, trx->id,
+ flush_observer);
+ btr_bulk.init();
+
pct_cost = (COST_BUILD_INDEX_STATIC +
(total_dynamic_cost * merge_files[i].offset /
total_index_blocks)) /
@@ -4233,9 +5000,13 @@ wait_again:
error = row_merge_insert_index_tuples(
trx->id, sort_idx, old_table,
- merge_files[i].fd, block,
+ merge_files[i].fd, block, NULL,
+ &btr_bulk,
merge_files[i].n_rec, pct_progress, pct_cost,
- crypt_data, crypt_block, new_table->space);
+ crypt_data, crypt_block, new_table->space, stage);
+
+ error = btr_bulk.finish(error);
+
pct_progress += pct_cost;
sql_print_information("InnoDB: Online DDL : "
@@ -4257,9 +5028,14 @@ wait_again:
ut_ad(sort_idx->online_status
== ONLINE_INDEX_COMPLETE);
} else {
+ ut_ad(need_flush_observer);
sql_print_information("InnoDB: Online DDL : Start applying row log");
+
+ flush_observer->flush();
+ row_merge_write_redo(indexes[i]);
+
DEBUG_SYNC_C("row_log_apply_before");
- error = row_log_apply(trx, sort_idx, table);
+ error = row_log_apply(trx, sort_idx, table, stage);
DEBUG_SYNC_C("row_log_apply_after");
sql_print_information("InnoDB: Online DDL : End of applying row log");
}
@@ -4272,19 +5048,13 @@ wait_again:
}
if (indexes[i]->type & DICT_FTS && fts_enable_diag_print) {
- char* name = (char*) indexes[i]->name;
-
- if (*name == TEMP_INDEX_PREFIX) {
- name++;
- }
-
- ut_print_timestamp(stderr);
- fprintf(stderr, " InnoDB: Finished building "
- "full-text index %s\n", name);
+ ib::info() << "Finished building full-text index "
+ << indexes[i]->name;
}
}
func_exit:
+
DBUG_EXECUTE_IF(
"ib_build_indexes_too_many_concurrent_trxs",
error = DB_TOO_MANY_CONCURRENT_TRXS;
@@ -4306,11 +5076,12 @@ func_exit:
dict_mem_index_free(fts_sort_idx);
}
- mem_free(merge_files);
- os_mem_free_large(block, block_size);
+ ut_free(merge_files);
+
+ alloc.deallocate_large(block, &block_pfx);
if (crypt_block) {
- os_mem_free_large(crypt_block, block_size);
+ alloc.deallocate_large(crypt_block, &crypt_pfx);
}
DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
@@ -4320,7 +5091,7 @@ func_exit:
as aborted. */
for (i = 0; i < n_indexes; i++) {
ut_ad(!(indexes[i]->type & DICT_FTS));
- ut_ad(*indexes[i]->name == TEMP_INDEX_PREFIX);
+ ut_ad(!indexes[i]->is_committed());
ut_ad(!dict_index_is_clust(indexes[i]));
/* Completed indexes should be dropped as
@@ -4346,12 +5117,42 @@ func_exit:
/* fall through */
case ONLINE_INDEX_ABORTED_DROPPED:
case ONLINE_INDEX_ABORTED:
- MONITOR_MUTEX_INC(
- &dict_sys->mutex,
+ MONITOR_ATOMIC_INC(
MONITOR_BACKGROUND_DROP_INDEX);
}
}
}
+ DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
+
+ if (flush_observer != NULL) {
+ ut_ad(need_flush_observer);
+
+ DBUG_EXECUTE_IF("ib_index_build_fail_before_flush",
+ error = DB_FAIL;
+ );
+
+ if (error != DB_SUCCESS) {
+ flush_observer->interrupted();
+ }
+
+ flush_observer->flush();
+
+ UT_DELETE(flush_observer);
+
+ if (trx_is_interrupted(trx)) {
+ error = DB_INTERRUPTED;
+ }
+
+ if (error == DB_SUCCESS && old_table != new_table) {
+ for (const dict_index_t* index
+ = dict_table_get_first_index(new_table);
+ index != NULL;
+ index = dict_table_get_next_index(index)) {
+ row_merge_write_redo(index);
+ }
+ }
+ }
+
DBUG_RETURN(error);
}