summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0merge.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0merge.cc')
-rw-r--r--storage/innobase/row/row0merge.cc168
1 files changed, 151 insertions, 17 deletions
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index cb219e29d6e..7ebcdefdc3a 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -1,6 +1,6 @@
/*****************************************************************************
-Copyright (c) 2005, 2013, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2005, 2015, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -235,22 +235,86 @@ row_merge_buf_free(
mem_heap_free(buf->heap);
}
-/******************************************************//**
-Insert a data tuple into a sort buffer.
-@return number of rows added, 0 if out of space */
+/** Convert the field data from compact to redundant format.
+@param[in] row_field field to copy from
+@param[out] field field to copy to
+@param[in] len length of the field data
+@param[in] zip_size compressed BLOB page size,
+ zero for uncompressed BLOBs
+@param[in,out] heap memory heap where to allocate data when
+ converting to ROW_FORMAT=REDUNDANT, or NULL
+ when not to invoke
+ row_merge_buf_redundant_convert(). */
+static
+void
+row_merge_buf_redundant_convert(
+ const dfield_t* row_field,
+ dfield_t* field,
+ ulint len,
+ ulint zip_size,
+ mem_heap_t* heap)
+{
+ ut_ad(DATA_MBMINLEN(field->type.mbminmaxlen) == 1);
+ ut_ad(DATA_MBMAXLEN(field->type.mbminmaxlen) > 1);
+
+ byte* buf = (byte*) mem_heap_alloc(heap, len);
+ ulint field_len = row_field->len;
+ ut_ad(field_len <= len);
+
+ if (row_field->ext) {
+ const byte* field_data = static_cast<byte*>(
+ dfield_get_data(row_field));
+ ulint ext_len;
+
+ ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
+ ut_a(memcmp(field_data + field_len - BTR_EXTERN_FIELD_REF_SIZE,
+ field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
+
+ byte* data = btr_copy_externally_stored_field(
+ &ext_len, field_data, zip_size, field_len, heap, NULL);
+
+ ut_ad(ext_len < len);
+
+ memcpy(buf, data, ext_len);
+ field_len = ext_len;
+ } else {
+ memcpy(buf, row_field->data, field_len);
+ }
+
+ memset(buf + field_len, 0x20, len - field_len);
+
+ dfield_set_data(field, buf, len);
+}
+
+/** Insert a data tuple into a sort buffer.
+@param[in,out] buf sort buffer
+@param[in] fts_index fts index to be created
+@param[in] old_table original table
+@param[in,out] psort_info parallel sort info
+@param[in] row table row
+@param[in] ext cache of externally stored
+ column prefixes, or NULL
+@param[in,out] doc_id Doc ID if we are creating
+ FTS index
+@param[in,out] conv_heap memory heap where to allocate data when
+ converting to ROW_FORMAT=REDUNDANT, or NULL
+ when not to invoke
+ row_merge_buf_redundant_convert()
+@param[in,out] exceed_page set if the record size exceeds the page size
+ when converting to ROW_FORMAT=REDUNDANT
+@return number of rows added, 0 if out of space */
static
ulint
row_merge_buf_add(
-/*==============*/
- row_merge_buf_t* buf, /*!< in/out: sort buffer */
- dict_index_t* fts_index,/*!< in: fts index to be created */
- const dict_table_t* old_table,/*!< in: original table */
- fts_psort_t* psort_info, /*!< in: parallel sort info */
- const dtuple_t* row, /*!< in: table row */
- const row_ext_t* ext, /*!< in: cache of externally stored
- column prefixes, or NULL */
- doc_id_t* doc_id) /*!< in/out: Doc ID if we are
- creating FTS index */
+ row_merge_buf_t* buf,
+ dict_index_t* fts_index,
+ const dict_table_t* old_table,
+ fts_psort_t* psort_info,
+ const dtuple_t* row,
+ const row_ext_t* ext,
+ doc_id_t* doc_id,
+ mem_heap_t* conv_heap,
+ bool* exceed_page)
{
ulint i;
const dict_index_t* index;
@@ -400,6 +464,23 @@ row_merge_buf_add(
n_row_added = 1;
continue;
}
+
+ if (field->len != UNIV_SQL_NULL
+ && col->mtype == DATA_MYSQL
+ && col->len != field->len) {
+
+ if (conv_heap != NULL) {
+ row_merge_buf_redundant_convert(
+ row_field, field, col->len,
+ dict_table_zip_size(old_table),
+ conv_heap);
+ } else {
+ /* Field length mismatch should not
+ happen when rebuilding redundant row
+ format table. */
+ ut_ad(dict_table_is_comp(index->table));
+ }
+ }
}
len = dfield_get_len(field);
@@ -508,6 +589,14 @@ row_merge_buf_add(
of extra_size. */
data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
+ /* Record size can exceed page size while converting to
+ redundant row format. But there is assert
+ ut_ad(size < UNIV_PAGE_SIZE) in rec_offs_data_size().
+ It may hit the assert before attempting to insert the row. */
+ if (conv_heap != NULL && data_size > UNIV_PAGE_SIZE) {
+ *exceed_page = true;
+ }
+
ut_ad(data_size < srv_sort_buf_size);
/* Reserve one byte for the end marker of row_merge_block_t. */
@@ -527,6 +616,10 @@ row_merge_buf_add(
dfield_dup(field++, buf->heap);
} while (--n_fields);
+ if (conv_heap != NULL) {
+ mem_heap_empty(conv_heap);
+ }
+
DBUG_RETURN(n_row_added);
}
@@ -1209,6 +1302,7 @@ row_merge_read_clustered_index(
os_event_t fts_parallel_sort_event = NULL;
ibool fts_pll_sort = FALSE;
ib_int64_t sig_count = 0;
+ mem_heap_t* conv_heap = NULL;
DBUG_ENTER("row_merge_read_clustered_index");
ut_ad((old_table == new_table) == !col_map);
@@ -1304,6 +1398,11 @@ row_merge_read_clustered_index(
row_heap = mem_heap_create(sizeof(mrec_buf_t));
+ if (dict_table_is_comp(old_table)
+ && !dict_table_is_comp(new_table)) {
+ conv_heap = mem_heap_create(sizeof(mrec_buf_t));
+ }
+
/* Scan the clustered index. */
for (;;) {
const rec_t* rec;
@@ -1582,16 +1681,24 @@ write_buffers:
row_merge_buf_t* buf = merge_buf[i];
merge_file_t* file = &files[i];
ulint rows_added = 0;
+ bool exceed_page = false;
if (UNIV_LIKELY
(row && (rows_added = row_merge_buf_add(
buf, fts_index, old_table,
- psort_info, row, ext, &doc_id)))) {
+ psort_info, row, ext, &doc_id,
+ conv_heap, &exceed_page)))) {
/* If we are creating FTS index,
a single row can generate more
records for tokenized word */
file->n_rec += rows_added;
+
+ if (exceed_page) {
+ err = DB_TOO_BIG_RECORD;
+ break;
+ }
+
if (doc_id > max_doc_id) {
max_doc_id = doc_id;
}
@@ -1692,12 +1799,18 @@ write_buffers:
(!(rows_added = row_merge_buf_add(
buf, fts_index, old_table,
psort_info, row, ext,
- &doc_id)))) {
+ &doc_id, conv_heap,
+ &exceed_page)))) {
/* An empty buffer should have enough
room for at least one record. */
ut_error;
}
+ if (exceed_page) {
+ err = DB_TOO_BIG_RECORD;
+ break;
+ }
+
file->n_rec += rows_added;
}
}
@@ -1722,6 +1835,10 @@ func_exit:
}
all_done:
+ if (conv_heap != NULL) {
+ mem_heap_free(conv_heap);
+ }
+
#ifdef FTS_INTERNAL_DIAG_PRINT
DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
#endif
@@ -2100,6 +2217,7 @@ row_merge(
/* Copy the last blocks, if there are any. */
while (foffs0 < ihalf) {
+
if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
return(DB_INTERRUPTED);
}
@@ -2116,6 +2234,7 @@ row_merge(
ut_ad(foffs0 == ihalf);
while (foffs1 < file->offset) {
+
if (trx_is_interrupted(trx)) {
return(DB_INTERRUPTED);
}
@@ -2175,6 +2294,7 @@ row_merge_sort(
{
const ulint half = file->offset / 2;
ulint num_runs;
+ ulint cur_run = 0;
ulint* run_offset;
dberr_t error = DB_SUCCESS;
DBUG_ENTER("row_merge_sort");
@@ -2198,11 +2318,19 @@ row_merge_sort(
of file marker). Thus, it must be at least one block. */
ut_ad(file->offset > 0);
+ thd_progress_init(trx->mysql_thd, num_runs);
+
/* Merge the runs until we have one big run */
do {
+ cur_run++;
+
error = row_merge(trx, dup, file, block, tmpfd,
&num_runs, run_offset);
+ /* Report progress of merge sort to MySQL for
+ show processlist progress field */
+ thd_progress_report(trx->mysql_thd, cur_run, num_runs);
+
if (error != DB_SUCCESS) {
break;
}
@@ -2212,6 +2340,8 @@ row_merge_sort(
mem_free(run_offset);
+ thd_progress_end(trx->mysql_thd);
+
DBUG_RETURN(error);
}
@@ -3337,9 +3467,13 @@ row_merge_create_index(
for (i = 0; i < n_fields; i++) {
index_field_t* ifield = &index_def->fields[i];
+ const char * col_name = ifield->col_name ?
+ dict_table_get_col_name_for_mysql(table, ifield->col_name) :
+ dict_table_get_col_name(table, ifield->col_no);
dict_mem_index_add_field(
- index, dict_table_get_col_name(table, ifield->col_no),
+ index,
+ col_name,
ifield->prefix_len);
}