diff options
Diffstat (limited to 'storage/innobase/row/row0merge.cc')
-rw-r--r-- | storage/innobase/row/row0merge.cc | 594 |
1 files changed, 317 insertions, 277 deletions
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index 8c8f1674374..20a30fdfe05 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -24,7 +24,7 @@ New index creation routines using a merge sort Created 12/4/2005 Jan Lindstrom Completed by Sunny Bains and Marko Makela *******************************************************/ -#include <my_config.h> +#include <my_global.h> #include <log.h> #include <sql_class.h> #include <math.h> @@ -134,7 +134,7 @@ public: ut_ad(dict_index_is_spatial(m_index)); DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush", - log_sys->check_flush_or_checkpoint = true; + log_sys.check_flush_or_checkpoint = true; ); for (idx_tuple_vec::iterator it = m_dtuple_vec->begin(); @@ -143,7 +143,7 @@ public: dtuple = *it; ut_ad(dtuple); - if (log_sys->check_flush_or_checkpoint) { + if (log_sys.check_flush_or_checkpoint) { if (scan_mtr->is_active()) { btr_pcur_move_to_prev_on_page(pcur); btr_pcur_store_position(pcur, scan_mtr); @@ -154,7 +154,7 @@ public: } mtr.start(); - mtr.set_named_space(m_index->space); + m_index->set_modified(mtr); ins_cur.index = m_index; rtr_init_rtr_info(&rtr_info, false, &ins_cur, m_index, @@ -165,7 +165,7 @@ public: PAGE_CUR_RTREE_INSERT, BTR_MODIFY_LEAF, &ins_cur, 0, __FILE__, __LINE__, - &mtr, 0); + &mtr); /* It need to update MBR in parent entry, so change search mode to BTR_MODIFY_TREE */ @@ -176,12 +176,12 @@ public: m_index, false); rtr_info_update_btr(&ins_cur, &rtr_info); mtr_start(&mtr); - mtr.set_named_space(m_index->space); + m_index->set_modified(mtr); btr_cur_search_to_nth_level( m_index, 0, dtuple, PAGE_CUR_RTREE_INSERT, BTR_MODIFY_TREE, &ins_cur, 0, - __FILE__, __LINE__, &mtr, 0); + __FILE__, __LINE__, &mtr); } error = btr_cur_optimistic_insert( @@ -192,7 +192,7 @@ public: ut_ad(!big_rec); mtr.commit(); mtr.start(); - mtr.set_named_space(m_index->space); + m_index->set_modified(mtr); rtr_clean_rtr_info(&rtr_info, true); rtr_init_rtr_info(&rtr_info, false, @@ -204,7 +204,7 @@ public: PAGE_CUR_RTREE_INSERT, BTR_MODIFY_TREE, &ins_cur, 0, - __FILE__, __LINE__, &mtr, 0); + __FILE__, __LINE__, &mtr); error = btr_cur_pessimistic_insert( flag, &ins_cur, &ins_offsets, @@ -220,7 +220,7 @@ public: if (error == DB_SUCCESS) { if (rtr_info.mbr_adj) { error = rtr_ins_enlarge_mbr( - &ins_cur, NULL, &mtr); + &ins_cur, &mtr); } if (error == DB_SUCCESS) { @@ -278,7 +278,7 @@ dberr_t row_merge_insert_index_tuples( dict_index_t* index, const dict_table_t* old_table, - int fd, + const pfs_os_file_t& fd, row_merge_block_t* block, const row_merge_buf_t* row_buf, BtrBulk* btr_bulk, @@ -549,7 +549,7 @@ row_merge_buf_add( mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields)); data_size = 0; - extra_size = UT_BITS_IN_BYTES(index->n_nullable); + extra_size = UT_BITS_IN_BYTES(unsigned(index->n_nullable)); ifield = dict_index_get_nth_field(index, 0); @@ -562,7 +562,7 @@ row_merge_buf_add( col = ifield->col; const dict_v_col_t* v_col = NULL; - if (dict_col_is_virtual(col)) { + if (col->is_virtual()) { v_col = reinterpret_cast<const dict_v_col_t*>(col); } @@ -571,7 +571,7 @@ row_merge_buf_add( /* Process the Doc ID column */ if (*doc_id > 0 && col_no == index->table->fts->doc_col - && !dict_col_is_virtual(col)) { + && !col->is_virtual()) { fts_write_doc_id((byte*) &write_doc_id, *doc_id); /* Note: field->data now points to a value on the @@ -590,7 +590,7 @@ row_merge_buf_add( field->type.len = ifield->col->len; } else { /* Use callback to get the virtual column value */ - if (dict_col_is_virtual(col)) { + if (col->is_virtual()) { dict_index_t* clust_index = dict_table_get_first_index(new_table); @@ -729,7 +729,7 @@ row_merge_buf_add( len = dfield_get_len(field); } } - } else if (!dict_col_is_virtual(col)) { + } else if (!col->is_virtual()) { /* Only non-virtual column are stored externally */ const byte* buf = row_ext_lookup(ext, col_no, &len); @@ -816,9 +816,9 @@ row_merge_buf_add( /* Record size can exceed page size while converting to redundant row format. But there is assert - ut_ad(size < UNIV_PAGE_SIZE) in rec_offs_data_size(). + ut_ad(size < srv_page_size) in rec_offs_data_size(). It may hit the assert before attempting to insert the row. */ - if (conv_heap != NULL && data_size > UNIV_PAGE_SIZE) { + if (conv_heap != NULL && data_size > srv_page_size) { *err = DB_TOO_BIG_RECORD; } @@ -1076,7 +1076,7 @@ row_merge_heap_create( bool row_merge_read( /*===========*/ - int fd, /*!< in: file descriptor */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ ulint offset, /*!< in: offset where to read in number of row_merge_block_t elements */ @@ -1091,9 +1091,8 @@ row_merge_read( DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE);); IORequest request(IORequest::READ); - const bool success = DB_SUCCESS - == os_file_read_no_error_handling_int_fd( - request, fd, buf, ofs, srv_sort_buf_size); + const bool success = DB_SUCCESS == os_file_read_no_error_handling( + request, fd, buf, ofs, srv_sort_buf_size, 0); /* If encryption is enabled decrypt buffer */ if (success && log_tmp_is_encrypted()) { @@ -1127,7 +1126,7 @@ UNIV_INTERN bool row_merge_write( /*============*/ - int fd, /*!< in: file descriptor */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ ulint offset, /*!< in: offset where to write, in number of row_merge_block_t elements */ const void* buf, /*!< in: data */ @@ -1156,7 +1155,7 @@ row_merge_write( } IORequest request(IORequest::WRITE); - const bool success = DB_SUCCESS == os_file_write_int_fd( + const bool success = DB_SUCCESS == os_file_write( request, "(merge)", fd, out_buf, ofs, buf_len); #ifdef POSIX_FADV_DONTNEED @@ -1178,7 +1177,7 @@ row_merge_read_rec( mrec_buf_t* buf, /*!< in/out: secondary buffer */ const byte* b, /*!< in: pointer to record */ const dict_index_t* index, /*!< in: index of the record */ - int fd, /*!< in: file descriptor */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ ulint* foffs, /*!< in/out: file offset */ const mrec_t** mrec, /*!< out: pointer to merge record, or NULL on end of list @@ -1242,7 +1241,7 @@ err_exit: to the auxiliary buffer and handle this as a special case. */ - avail_size = &block[srv_sort_buf_size] - b; + avail_size = ulint(&block[srv_sort_buf_size] - b); ut_ad(avail_size < sizeof *buf); memcpy(*buf, b, avail_size); @@ -1297,7 +1296,7 @@ err_exit: /* The record spans two blocks. Copy it to buf. */ b -= extra_size + data_size; - avail_size = &block[srv_sort_buf_size] - b; + avail_size = ulint(&block[srv_sort_buf_size] - b); memcpy(*buf, b, avail_size); *mrec = *buf + extra_size; @@ -1336,7 +1335,7 @@ row_merge_write_rec_low( ulint e, /*!< in: encoded extra_size */ #ifndef DBUG_OFF ulint size, /*!< in: total size to write */ - int fd, /*!< in: file descriptor */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ ulint foffs, /*!< in: file offset */ #endif /* !DBUG_OFF */ const mrec_t* mrec, /*!< in: record to write */ @@ -1365,7 +1364,7 @@ row_merge_write_rec_low( } memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets)); - DBUG_ASSERT(b + rec_offs_size(offsets) == end); + DBUG_SLOW_ASSERT(b + rec_offs_size(offsets) == end); DBUG_VOID_RETURN; } @@ -1379,7 +1378,7 @@ row_merge_write_rec( row_merge_block_t* block, /*!< in/out: file buffer */ mrec_buf_t* buf, /*!< in/out: secondary buffer */ byte* b, /*!< in: pointer to end of block */ - int fd, /*!< in: file descriptor */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ ulint* foffs, /*!< in/out: file offset */ const mrec_t* mrec, /*!< in: record to write */ const rec_offs* offsets,/*!< in: offsets of mrec */ @@ -1408,7 +1407,7 @@ row_merge_write_rec( if (UNIV_UNLIKELY(b + size >= &block[srv_sort_buf_size])) { /* The record spans two blocks. Copy it to the temporary buffer first. */ - avail_size = &block[srv_sort_buf_size] - b; + avail_size = ulint(&block[srv_sort_buf_size] - b); row_merge_write_rec_low(buf[0], extra_size, size, fd, *foffs, @@ -1449,7 +1448,7 @@ row_merge_write_eof( /*================*/ row_merge_block_t* block, /*!< in/out: file buffer */ byte* b, /*!< in: pointer to end of block */ - int fd, /*!< in: file descriptor */ + const pfs_os_file_t& fd, /*!< in: file descriptor */ ulint* foffs, /*!< in/out: file offset */ row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ ulint space) /*!< in: space id */ @@ -1472,7 +1471,7 @@ row_merge_write_eof( #ifdef UNIV_DEBUG_VALGRIND /* The rest of the block is uninitialized. Initialize it to avoid bogus warnings. */ - memset(b, 0xff, &block[srv_sort_buf_size] - b); + memset(b, 0xff, ulint(&block[srv_sort_buf_size] - b)); #endif /* UNIV_DEBUG_VALGRIND */ if (!row_merge_write(fd, (*foffs)++, block, crypt_block, space)) { @@ -1486,48 +1485,48 @@ row_merge_write_eof( /** Create a temporary file if it has not been created already. @param[in,out] tmpfd temporary file handle @param[in] path location for creating temporary file -@return file descriptor, or -1 on failure */ +@return true on success, false on error */ static MY_ATTRIBUTE((warn_unused_result)) -int +bool row_merge_tmpfile_if_needed( - int* tmpfd, + pfs_os_file_t* tmpfd, const char* path) { - if (*tmpfd < 0) { + if (*tmpfd == OS_FILE_CLOSED) { *tmpfd = row_merge_file_create_low(path); - if (*tmpfd >= 0) { + if (*tmpfd != OS_FILE_CLOSED) { MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES); } } - return(*tmpfd); + return(*tmpfd != OS_FILE_CLOSED); } /** Create a temporary file for merge sort if it was not created already. @param[in,out] file merge file structure @param[in] nrec number of records in the file @param[in] path location for creating temporary file -@return file descriptor, or -1 on failure */ +@return true on success, false on error */ static MY_ATTRIBUTE((warn_unused_result)) -int +bool row_merge_file_create_if_needed( merge_file_t* file, - int* tmpfd, + pfs_os_file_t* tmpfd, ulint nrec, const char* path) { - ut_ad(file->fd < 0 || *tmpfd >=0); - if (file->fd < 0 && row_merge_file_create(file, path) >= 0) { + ut_ad(file->fd == OS_FILE_CLOSED || *tmpfd != OS_FILE_CLOSED); + if (file->fd == OS_FILE_CLOSED && row_merge_file_create(file, path)!= OS_FILE_CLOSED) { MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES); - if (row_merge_tmpfile_if_needed(tmpfd, path) < 0) { - return(-1); + if (!row_merge_tmpfile_if_needed(tmpfd, path) ) { + return(false); } file->n_rec = nrec; } - ut_ad(file->fd < 0 || *tmpfd >=0); - return(file->fd); + ut_ad(file->fd == OS_FILE_CLOSED || *tmpfd != OS_FILE_CLOSED); + return(file->fd != OS_FILE_CLOSED); } /** Copy the merge data tuple from another merge data tuple. @@ -1657,7 +1656,7 @@ containing the index entries for the indexes to be built. @param[in] files temporary files @param[in] key_numbers MySQL key numbers to create @param[in] n_index number of indexes to create -@param[in] add_cols default values of added columns, or NULL +@param[in] defaults default values of added, changed columns, or NULL @param[in] add_v newly added virtual columns along with indexes @param[in] col_map mapping of old column numbers to new ones, or NULL if old_table == new_table @@ -1675,6 +1674,7 @@ stage->inc() will be called for each page read. @param[in,out] crypt_block crypted file buffer @param[in] eval_table mysql table used to evaluate virtual column value, see innobase_get_computed_value(). +@param[in] allow_not_null allow null to not-null conversion @return DB_SUCCESS or error */ static MY_ATTRIBUTE((warn_unused_result)) dberr_t @@ -1682,7 +1682,7 @@ row_merge_read_clustered_index( trx_t* trx, struct TABLE* table, const dict_table_t* old_table, - const dict_table_t* new_table, + dict_table_t* new_table, bool online, dict_index_t** index, dict_index_t* fts_sort_idx, @@ -1690,18 +1690,19 @@ row_merge_read_clustered_index( merge_file_t* files, const ulint* key_numbers, ulint n_index, - const dtuple_t* add_cols, + const dtuple_t* defaults, const dict_add_v_col_t* add_v, const ulint* col_map, ulint add_autoinc, ib_sequence_t& sequence, row_merge_block_t* block, bool skip_pk_sort, - int* tmpfd, + pfs_os_file_t* tmpfd, ut_stage_alter_t* stage, double pct_cost, row_merge_block_t* crypt_block, - struct TABLE* eval_table) + struct TABLE* eval_table, + bool allow_not_null) { dict_index_t* clust_index; /* Clustered index */ mem_heap_t* row_heap = NULL;/* Heap memory to create @@ -1734,11 +1735,17 @@ row_merge_read_clustered_index( double curr_progress = 0.0; ib_uint64_t read_rows = 0; ib_uint64_t table_total_rows = 0; + char new_sys_trx_start[8]; + char new_sys_trx_end[8]; + byte any_autoinc_data[8] = {0}; + bool vers_update_trt = false; DBUG_ENTER("row_merge_read_clustered_index"); ut_ad((old_table == new_table) == !col_map); - ut_ad(!add_cols || col_map); + ut_ad(!defaults || col_map); + ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE)); + ut_ad(trx->id); table_total_rows = dict_table_get_n_rows(old_table); if(table_total_rows == 0) { @@ -1834,9 +1841,27 @@ row_merge_read_clustered_index( based on that. */ clust_index = dict_table_get_first_index(old_table); + const ulint old_trx_id_col = DATA_TRX_ID - DATA_N_SYS_COLS + + ulint(old_table->n_cols); + ut_ad(old_table->cols[old_trx_id_col].mtype == DATA_SYS); + ut_ad(old_table->cols[old_trx_id_col].prtype + == (DATA_TRX_ID | DATA_NOT_NULL)); + ut_ad(old_table->cols[old_trx_id_col + 1].mtype == DATA_SYS); + ut_ad(old_table->cols[old_trx_id_col + 1].prtype + == (DATA_ROLL_PTR | DATA_NOT_NULL)); + const ulint new_trx_id_col = col_map + ? col_map[old_trx_id_col] : old_trx_id_col; btr_pcur_open_at_index_side( true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr); + btr_pcur_move_to_next_user_rec(&pcur, &mtr); + if (rec_is_metadata(btr_pcur_get_rec(&pcur), clust_index)) { + ut_ad(btr_pcur_is_on_user_rec(&pcur)); + /* Skip the metadata pseudo-record. */ + } else { + ut_ad(!clust_index->is_instant()); + btr_pcur_move_to_prev_on_page(&pcur); + } if (old_table != new_table) { /* The table is being rebuilt. Identify the columns @@ -1888,6 +1913,10 @@ row_merge_read_clustered_index( prev_fields = NULL; } + mach_write_to_8(new_sys_trx_start, trx->id); + mach_write_to_8(new_sys_trx_end, TRX_ID_MAX); + uint64_t n_rows = 0; + /* Scan the clustered index. */ for (;;) { /* Do not continue if table pages are still encrypted */ @@ -1898,6 +1927,7 @@ row_merge_read_clustered_index( } const rec_t* rec; + trx_id_t rec_trx_id; rec_offs* offsets; dtuple_t* row; row_ext_t* ext; @@ -1927,15 +1957,6 @@ row_merge_read_clustered_index( } } -#ifdef DBUG_OFF -# define dbug_run_purge false -#else /* DBUG_OFF */ - bool dbug_run_purge = false; -#endif /* DBUG_OFF */ - DBUG_EXECUTE_IF( - "ib_purge_on_create_index_page_switch", - dbug_run_purge = true;); - /* Insert the cached spatial index rows. */ err = row_merge_spatial_rows( trx->id, sp_tuples, num_spatial, @@ -1949,8 +1970,8 @@ row_merge_read_clustered_index( goto scan_next; } - if (dbug_run_purge - || dict_index_get_lock(clust_index)->waiters) { + if (my_atomic_load32_explicit(&clust_index->lock.waiters, + MY_MEMORY_ORDER_RELAXED)) { /* There are waiters on the clustered index tree lock, likely the purge thread. Store and restore the cursor @@ -1971,18 +1992,6 @@ row_merge_read_clustered_index( btr_pcur_store_position(&pcur, &mtr); mtr_commit(&mtr); - if (dbug_run_purge) { - /* This is for testing - purposes only (see - DBUG_EXECUTE_IF above). We - signal the purge thread and - hope that the purge batch will - complete before we execute - btr_pcur_restore_position(). */ - trx_purge_run(); - os_thread_sleep(1000000); - } - /* Give the waiters a chance to proceed. */ os_thread_yield(); scan_next: @@ -2038,6 +2047,8 @@ end_of_index: if (online) { offsets = rec_get_offsets(rec, clust_index, NULL, true, ULINT_UNDEFINED, &row_heap); + rec_trx_id = row_get_rec_trx_id(rec, clust_index, + offsets); /* Perform a REPEATABLE READ. @@ -2058,33 +2069,45 @@ end_of_index: ONLINE_INDEX_COMPLETE state between the time the DML thread has updated the clustered index but has not yet accessed secondary index. */ - ut_ad(MVCC::is_view_active(trx->read_view)); + ut_ad(trx->read_view.is_open()); + ut_ad(rec_trx_id != trx->id); - if (!trx->read_view->changes_visible( - row_get_rec_trx_id( - rec, clust_index, offsets), - old_table->name)) { + if (!trx->read_view.changes_visible( + rec_trx_id, old_table->name)) { rec_t* old_vers; row_vers_build_for_consistent_read( rec, &mtr, clust_index, &offsets, - trx->read_view, &row_heap, + &trx->read_view, &row_heap, row_heap, &old_vers, NULL); - rec = old_vers; - - if (!rec) { + if (!old_vers) { continue; } + + /* The old version must necessarily be + in the "prehistory", because the + exclusive lock in + ha_innobase::prepare_inplace_alter_table() + forced the completion of any transactions + that accessed this table. */ + ut_ad(row_get_rec_trx_id(old_vers, clust_index, + offsets) < trx->id); + + rec = old_vers; + rec_trx_id = 0; } if (rec_get_deleted_flag( rec, dict_table_is_comp(old_table))) { /* In delete-marked records, DB_TRX_ID must - always refer to an existing undo log record. */ - ut_ad(row_get_rec_trx_id(rec, clust_index, - offsets)); + always refer to an existing undo log record. + Above, we did reset rec_trx_id = 0 + for rec = old_vers.*/ + ut_ad(rec == page_cur_get_rec(cur) + ? rec_trx_id + : !rec_trx_id); /* This record was deleted in the latest committed version, or it was deleted and then reinserted-by-update before purge @@ -2097,19 +2120,37 @@ end_of_index: rec, dict_table_is_comp(old_table))) { /* In delete-marked records, DB_TRX_ID must always refer to an existing undo log record. */ - ut_ad(rec_get_trx_id(rec, clust_index)); + ut_d(rec_trx_id = rec_get_trx_id(rec, clust_index)); + ut_ad(rec_trx_id); + /* This must be a purgeable delete-marked record, + and the transaction that delete-marked the record + must have been committed before this + !online ALTER TABLE transaction. */ + ut_ad(rec_trx_id < trx->id); /* Skip delete-marked records. Skipping delete-marked records will make the created indexes unuseable for transactions whose read views were created before the index - creation completed, but preserving the history - would make it tricky to detect duplicate - keys. */ + creation completed, but an attempt to preserve + the history would make it tricky to detect + duplicate keys. */ continue; } else { offsets = rec_get_offsets(rec, clust_index, NULL, true, ULINT_UNDEFINED, &row_heap); + /* This is a locking ALTER TABLE. + + If we are not rebuilding the table, the + DB_TRX_ID does not matter, as it is not being + written to any secondary indexes; see + if (old_table == new_table) below. + + If we are rebuilding the table, the + DB_TRX_ID,DB_ROLL_PTR should be reset, because + there will be no history available. */ + ut_ad(rec_get_trx_id(rec, clust_index) < trx->id); + rec_trx_id = 0; } /* When !online, we are holding a lock on old_table, preventing @@ -2121,19 +2162,35 @@ end_of_index: row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index, rec, offsets, new_table, - add_cols, add_v, col_map, &ext, + defaults, add_v, col_map, &ext, row_heap); ut_ad(row); for (ulint i = 0; i < n_nonnull; i++) { - const dfield_t* field = &row->fields[nonnull[i]]; + dfield_t* field = &row->fields[nonnull[i]]; ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL); if (dfield_is_null(field)) { - err = DB_INVALID_NULL; - trx->error_key_num = 0; - goto func_exit; + + Field* null_field = + table->field[nonnull[i]]; + + null_field->set_warning( + Sql_condition::WARN_LEVEL_WARN, + WARN_DATA_TRUNCATED, 1, + ulong(n_rows + 1)); + + if (!allow_not_null) { + err = DB_INVALID_NULL; + trx->error_key_num = 0; + goto func_exit; + } + + const dfield_t& default_field + = defaults->fields[nonnull[i]]; + + *field = default_field; } } @@ -2144,13 +2201,62 @@ end_of_index: doc_id = 0; } + ut_ad(row->fields[new_trx_id_col].type.mtype == DATA_SYS); + ut_ad(row->fields[new_trx_id_col].type.prtype + == (DATA_TRX_ID | DATA_NOT_NULL)); + ut_ad(row->fields[new_trx_id_col].len == DATA_TRX_ID_LEN); + ut_ad(row->fields[new_trx_id_col + 1].type.mtype == DATA_SYS); + ut_ad(row->fields[new_trx_id_col + 1].type.prtype + == (DATA_ROLL_PTR | DATA_NOT_NULL)); + ut_ad(row->fields[new_trx_id_col + 1].len == DATA_ROLL_PTR_LEN); + + if (old_table == new_table) { + /* Do not bother touching DB_TRX_ID,DB_ROLL_PTR + because they are not going to be written into + secondary indexes. */ + } else if (rec_trx_id < trx->id) { + /* Reset the DB_TRX_ID,DB_ROLL_PTR of old rows + for which history is not going to be + available after the rebuild operation. + This essentially mimics row_purge_reset_trx_id(). */ + row->fields[new_trx_id_col].data + = const_cast<byte*>(reset_trx_id); + row->fields[new_trx_id_col + 1].data + = const_cast<byte*>(reset_trx_id + + DATA_TRX_ID_LEN); + } + if (add_autoinc != ULINT_UNDEFINED) { ut_ad(add_autoinc < dict_table_get_n_user_cols(new_table)); + bool history_row = false; + if (new_table->versioned()) { + const dfield_t* dfield = dtuple_get_nth_field( + row, new_table->vers_end); + history_row = dfield->vers_history_row(); + } + dfield_t* dfield = dtuple_get_nth_field(row, add_autoinc); + + if (new_table->versioned()) { + if (history_row) { + if (dfield_get_type(dfield)->prtype & DATA_NOT_NULL) { + err = DB_UNSUPPORTED; + my_error(ER_UNSUPPORTED_EXTENSION, MYF(0), + old_table->name.m_name); + goto func_exit; + } + dfield_set_null(dfield); + } else { + // set not null + ulint len = dfield_get_type(dfield)->len; + dfield_set_data(dfield, any_autoinc_data, len); + } + } + if (dfield_is_null(dfield)) { goto write_buffers; } @@ -2196,10 +2302,26 @@ end_of_index: } } + if (old_table->versioned()) { + if (!new_table->versioned() + && clust_index->vers_history_row(rec, offsets)) { + continue; + } + } else if (new_table->versioned()) { + dfield_t* start = + dtuple_get_nth_field(row, new_table->vers_start); + dfield_t* end = + dtuple_get_nth_field(row, new_table->vers_end); + dfield_set_data(start, new_sys_trx_start, 8); + dfield_set_data(end, new_sys_trx_end, 8); + vers_update_trt = true; + } + write_buffers: /* Build all entries for all the indexes to be created in a single scan of the clustered index. */ + n_rows++; ulint s_idx_cnt = 0; bool skip_sort = skip_pk_sort && dict_index_is_clust(merge_buf[0]->index); @@ -2229,6 +2351,11 @@ write_buffers: continue; } + ut_ad(!row + || !dict_index_is_clust(buf->index) + || trx_id_check(row->fields[new_trx_id_col].data, + trx->id)); + merge_file_t* file = &files[k++]; if (UNIV_LIKELY @@ -2393,12 +2520,13 @@ write_buffers: err = row_merge_insert_index_tuples( index[i], old_table, - -1, NULL, buf, clust_btr_bulk, + OS_FILE_CLOSED, NULL, buf, + clust_btr_bulk, table_total_rows, curr_progress, pct_cost, crypt_block, - new_table->space); + new_table->space_id); if (row == NULL) { err = clust_btr_bulk->finish( @@ -2485,7 +2613,7 @@ write_buffers: we can insert directly into the index without temporary file if clustered index does not uses temporary file. */ - if (row == NULL && file->fd == -1 + if (row == NULL && file->fd == OS_FILE_CLOSED && !clust_temp_file) { DBUG_EXECUTE_IF( "row_merge_write_failure", @@ -2505,12 +2633,13 @@ write_buffers: err = row_merge_insert_index_tuples( index[i], old_table, - -1, NULL, buf, &btr_bulk, + OS_FILE_CLOSED, NULL, buf, + &btr_bulk, table_total_rows, curr_progress, pct_cost, crypt_block, - new_table->space); + new_table->space_id); err = btr_bulk.finish(err); @@ -2522,9 +2651,9 @@ write_buffers: break; } } else { - if (row_merge_file_create_if_needed( + if (!row_merge_file_create_if_needed( file, tmpfd, - buf->n_tuples, path) < 0) { + buf->n_tuples, path)) { err = DB_OUT_OF_MEMORY; trx->error_key_num = i; break; @@ -2544,7 +2673,7 @@ write_buffers: if (!row_merge_write( file->fd, file->offset++, block, crypt_block, - new_table->space)) { + new_table->space_id)) { err = DB_TEMP_FILE_WRITE_FAIL; trx->error_key_num = i; break; @@ -2582,6 +2711,10 @@ write_buffers: } if (row == NULL) { + if (old_table != new_table) { + new_table->stat_n_rows = n_rows; + } + goto all_done; } @@ -2736,6 +2869,15 @@ wait_again: } } + if (vers_update_trt) { + trx_mod_table_time_t& time = + trx->mod_tables + .insert(trx_mod_tables_t::value_type( + const_cast<dict_table_t*>(new_table), 0)) + .first->second; + time.set_versioned(0); + } + trx->op_info = ""; DBUG_RETURN(err); @@ -2792,10 +2934,10 @@ wait_again: @param[in,out] foffs1 offset of second source list in the file @param[in,out] of output file @param[in,out] stage performance schema accounting object, used by -@param[in,out] crypt_block encryption buffer -@param[in] space tablespace ID for encryption ALTER TABLE. If not NULL stage->inc() will be called for each record processed. +@param[in,out] crypt_block encryption buffer +@param[in] space tablespace ID for encryption @return DB_SUCCESS or error code */ static MY_ATTRIBUTE((warn_unused_result)) dberr_t @@ -2806,7 +2948,7 @@ row_merge_blocks( ulint* foffs0, ulint* foffs1, merge_file_t* of, - ut_stage_alter_t* stage, + ut_stage_alter_t* stage MY_ATTRIBUTE((unused)), row_merge_block_t* crypt_block, ulint space) { @@ -2914,10 +3056,10 @@ done1: @param[in,out] foffs0 input file offset @param[in,out] of output file @param[in,out] stage performance schema accounting object, used by -@param[in,out] crypt_block encryption buffer -@param[in] space tablespace ID for encryption ALTER TABLE. If not NULL stage->inc() will be called for each record processed. +@param[in,out] crypt_block encryption buffer +@param[in] space tablespace ID for encryption @return TRUE on success, FALSE on failure */ static MY_ATTRIBUTE((warn_unused_result)) ibool @@ -2927,7 +3069,7 @@ row_merge_blocks_copy( row_merge_block_t* block, ulint* foffs0, merge_file_t* of, - ut_stage_alter_t* stage, + ut_stage_alter_t* stage MY_ATTRIBUTE((unused)), row_merge_block_t* crypt_block, ulint space) { @@ -3018,7 +3160,7 @@ row_merge( const row_merge_dup_t* dup, merge_file_t* file, row_merge_block_t* block, - int* tmpfd, + pfs_os_file_t* tmpfd, ulint* num_run, ulint* run_offset, ut_stage_alter_t* stage, @@ -3160,7 +3302,7 @@ row_merge_sort( const row_merge_dup_t* dup, merge_file_t* file, row_merge_block_t* block, - int* tmpfd, + pfs_os_file_t* tmpfd, const bool update_progress, /*!< in: update progress status variable or not */ @@ -3369,7 +3511,7 @@ dberr_t row_merge_insert_index_tuples( dict_index_t* index, const dict_table_t* old_table, - int fd, + const pfs_os_file_t& fd, row_merge_block_t* block, const row_merge_buf_t* row_buf, BtrBulk* btr_bulk, @@ -3421,7 +3563,7 @@ row_merge_insert_index_tuples( } if (row_buf != NULL) { - ut_ad(fd == -1); + ut_ad(fd == OS_FILE_CLOSED); ut_ad(block == NULL); DBUG_EXECUTE_IF("row_merge_read_failure", error = DB_CORRUPTION; @@ -3907,7 +4049,7 @@ row_merge_drop_temp_indexes(void) /* Load the table definitions that contain partially defined indexes, so that the data dictionary information can be checked when accessing the tablename.ibd files. */ - trx = trx_allocate_for_background(); + trx = trx_create(); trx->op_info = "dropping partially created indexes"; row_mysql_lock_data_dictionary(trx); /* Ensure that this transaction will be rolled back and locks @@ -3930,7 +4072,7 @@ row_merge_drop_temp_indexes(void) trx_commit_for_mysql(trx); row_mysql_unlock_data_dictionary(trx); - trx_free_for_background(trx); + trx_free(trx); } @@ -3938,15 +4080,15 @@ row_merge_drop_temp_indexes(void) UNIV_PFS_IO defined, register the file descriptor with Performance Schema. @param[in] path location for creating temporary merge files, or NULL @return File descriptor */ -int +pfs_os_file_t row_merge_file_create_low( const char* path) { - int fd; #ifdef UNIV_PFS_IO /* This temp file open does not go through normal file APIs, add instrumentation to register with performance schema */ + struct PSI_file_locker* locker; PSI_file_locker_state state; if (!path) { path = mysql_tmpdir; @@ -3956,27 +4098,21 @@ row_merge_file_create_low( ut_malloc_nokey(strlen(path) + sizeof label)); strcpy(name, path); strcat(name, label); - PSI_file_locker* locker = PSI_FILE_CALL(get_thread_file_name_locker)( - &state, innodb_temp_file_key, PSI_FILE_OPEN, - path ? name : label, &locker); - if (locker != NULL) { - PSI_FILE_CALL(start_file_open_wait)(locker, - __FILE__, - __LINE__); - } + + register_pfs_file_open_begin( + &state, locker, innodb_temp_file_key, + PSI_FILE_CREATE, path ? name : label, __FILE__, __LINE__); + #endif - fd = innobase_mysql_tmpfile(path); + pfs_os_file_t fd = innobase_mysql_tmpfile(path); #ifdef UNIV_PFS_IO - if (locker != NULL) { - PSI_FILE_CALL(end_file_open_wait_and_bind_to_descriptor)( - locker, fd); - } + register_pfs_file_open_end(locker, fd, + (fd == OS_FILE_CLOSED)?NULL:&fd); ut_free(name); #endif - if (fd < 0) { + if (fd == OS_FILE_CLOSED) { ib::error() << "Cannot create temporary merge file"; - return(-1); } return(fd); } @@ -3985,8 +4121,8 @@ row_merge_file_create_low( /** Create a merge file in the given location. @param[out] merge_file merge file structure @param[in] path location for creating temporary file, or NULL -@return file descriptor, or -1 on failure */ -int +@return file descriptor, or OS_FILE_CLOSED on error */ +pfs_os_file_t row_merge_file_create( merge_file_t* merge_file, const char* path) @@ -3995,7 +4131,7 @@ row_merge_file_create( merge_file->offset = 0; merge_file->n_rec = 0; - if (merge_file->fd >= 0) { + if (merge_file->fd != OS_FILE_CLOSED) { if (srv_disable_sort_file_cache) { os_file_set_nocache(merge_file->fd, "row0merge.cc", "sort"); @@ -4010,26 +4146,11 @@ if UNIV_PFS_IO is defined. */ void row_merge_file_destroy_low( /*=======================*/ - int fd) /*!< in: merge file descriptor */ + const pfs_os_file_t& fd) /*!< in: merge file descriptor */ { -#ifdef UNIV_PFS_IO - struct PSI_file_locker* locker = NULL; - PSI_file_locker_state state; - locker = PSI_FILE_CALL(get_thread_file_descriptor_locker)( - &state, fd, PSI_FILE_CLOSE); - if (locker != NULL) { - PSI_FILE_CALL(start_file_wait)( - locker, 0, __FILE__, __LINE__); - } -#endif - if (fd >= 0) { - close(fd); + if (fd != OS_FILE_CLOSED) { + os_file_close(fd); } -#ifdef UNIV_PFS_IO - if (locker != NULL) { - PSI_FILE_CALL(end_file_wait)(locker, 0); - } -#endif } /*********************************************************************//** Destroy a merge file. */ @@ -4040,9 +4161,9 @@ row_merge_file_destroy( { ut_ad(!srv_read_only_mode); - if (merge_file->fd != -1) { + if (merge_file->fd != OS_FILE_CLOSED) { row_merge_file_destroy_low(merge_file->fd); - merge_file->fd = -1; + merge_file->fd = OS_FILE_CLOSED; } } @@ -4161,19 +4282,9 @@ row_make_new_pathname( dict_table_t* table, /*!< in: table to be renamed */ const char* new_name) /*!< in: new name */ { - char* new_path; - char* old_path; - - ut_ad(!is_system_tablespace(table->space)); - - old_path = fil_space_get_first_path(table->space); - ut_a(old_path); - - new_path = os_file_make_new_pathname(old_path, new_name); - - ut_free(old_path); - - return(new_path); + ut_ad(!is_system_tablespace(table->space_id)); + return os_file_make_new_pathname(table->space->chain.start->name, + new_name); } /*********************************************************************//** @@ -4225,8 +4336,7 @@ row_merge_rename_tables_dict( renamed is a single-table tablespace, which must be implicitly renamed along with the table. */ if (err == DB_SUCCESS - && dict_table_is_file_per_table(old_table) - && fil_space_get(old_table->space) != NULL) { + && old_table->space_id) { /* Make pathname to update SYS_DATAFILES. */ char* tmp_path = row_make_new_pathname(old_table, tmp_name); @@ -4235,7 +4345,7 @@ row_merge_rename_tables_dict( pars_info_add_str_literal(info, "tmp_name", tmp_name); pars_info_add_str_literal(info, "tmp_path", tmp_path); pars_info_add_int4_literal(info, "old_space", - (lint) old_table->space); + old_table->space_id); err = que_eval_sql(info, "PROCEDURE RENAME_OLD_SPACE () IS\n" @@ -4266,7 +4376,7 @@ row_merge_rename_tables_dict( old_table->name.m_name); pars_info_add_str_literal(info, "old_path", old_path); pars_info_add_int4_literal(info, "new_space", - (lint) new_table->space); + new_table->space_id); err = que_eval_sql(info, "PROCEDURE RENAME_NEW_SPACE () IS\n" @@ -4282,9 +4392,9 @@ row_merge_rename_tables_dict( ut_free(old_path); } - if (err == DB_SUCCESS && dict_table_is_discarded(new_table)) { + if (err == DB_SUCCESS && (new_table->flags2 & DICT_TF2_DISCARDED)) { err = row_import_update_discarded_flag( - trx, new_table->id, true, true); + trx, new_table->id, true); } trx->op_info = ""; @@ -4292,54 +4402,7 @@ row_merge_rename_tables_dict( return(err); } -/** Create and execute a query graph for creating an index. -@param[in,out] trx trx -@param[in,out] table table -@param[in,out] index index -@param[in] add_v new virtual columns added along with add index call -@return DB_SUCCESS or error code */ -MY_ATTRIBUTE((nonnull(1,2), warn_unused_result)) -static -dberr_t -row_merge_create_index_graph( - trx_t* trx, - dict_table_t* table, - dict_index_t*& index, - const dict_add_v_col_t* add_v) -{ - ind_node_t* node; /*!< Index creation node */ - mem_heap_t* heap; /*!< Memory heap */ - que_thr_t* thr; /*!< Query thread */ - dberr_t err; - - DBUG_ENTER("row_merge_create_index_graph"); - - ut_ad(trx); - ut_ad(table); - ut_ad(index); - - heap = mem_heap_create(512); - - index->table = table; - node = ind_create_graph_create(index, heap, add_v); - thr = pars_complete_graph_for_exec(node, trx, heap, NULL); - - ut_a(thr == que_fork_start_command( - static_cast<que_fork_t*>(que_node_get_parent(thr)))); - - que_run_threads(thr); - - err = trx->error_state; - - index = node->index; - - que_graph_free((que_t*) que_node_get_parent(thr)); - - DBUG_RETURN(err); -} - /** Create the index and load in to the dictionary. -@param[in,out] trx trx (sets error_state) @param[in,out] table the index is on this table @param[in] index_def the index definition @param[in] add_v new virtual columns added along with add @@ -4347,16 +4410,13 @@ row_merge_create_index_graph( @return index, or NULL on error */ dict_index_t* row_merge_create_index( - trx_t* trx, dict_table_t* table, const index_def_t* index_def, const dict_add_v_col_t* add_v) { dict_index_t* index; - dberr_t err; ulint n_fields = index_def->n_fields; ulint i; - bool has_new_v_col = false; DBUG_ENTER("row_merge_create_index"); @@ -4366,11 +4426,8 @@ row_merge_create_index( a persistent operation. We pass 0 as the space id, and determine at a lower level the space id where to store the table. */ - index = dict_mem_index_create(table->name.m_name, index_def->name, - 0, index_def->ind_type, n_fields); - - ut_a(index); - + index = dict_mem_index_create(table, index_def->name, + index_def->ind_type, n_fields); index->set_committed(index_def->rebuild); for (i = 0; i < n_fields; i++) { @@ -4384,7 +4441,7 @@ row_merge_create_index( ut_ad(ifield->col_no >= table->n_v_def); name = add_v->v_col_name[ ifield->col_no - table->n_v_def]; - has_new_v_col = true; + index->has_new_v_col = true; } else { name = dict_table_get_v_col_name( table, ifield->col_no); @@ -4396,26 +4453,6 @@ row_merge_create_index( dict_mem_index_add_field(index, name, ifield->prefix_len); } - ut_d(const dict_index_t* const index_template = index); - /* Add the index to SYS_INDEXES, using the index prototype. */ - err = row_merge_create_index_graph(trx, table, index, add_v); - - if (err == DB_SUCCESS) { - ut_ad(index != index_template); - index->parser = index_def->parser; - index->has_new_v_col = has_new_v_col; - /* Note the id of the transaction that created this - index, we use it to restrict readers from accessing - this index, to ensure read consistency. */ - ut_ad(index->trx_id == trx->id); - } else { - ut_ad(!index || index == index_template); - if (index) { - dict_mem_index_free(index); - } - index = NULL; - } - DBUG_RETURN(index); } @@ -4434,10 +4471,10 @@ row_merge_is_index_usable( } return(!index->is_corrupted() - && (dict_table_is_temporary(index->table) + && (index->table->is_temporary() || index->trx_id == 0 - || !MVCC::is_view_active(trx->read_view) - || trx->read_view->changes_visible( + || !trx->read_view.is_open() + || trx->read_view.changes_visible( index->trx_id, index->table->name))); } @@ -4469,7 +4506,7 @@ the flushing of such pages to the data files was completed. @param[in] index an index tree on which redo logging was disabled */ void row_merge_write_redo(const dict_index_t* index) { - ut_ad(!dict_table_is_temporary(index->table)); + ut_ad(!index->table->is_temporary()); ut_ad(!(index->type & (DICT_SPATIAL | DICT_FTS))); mtr_t mtr; @@ -4477,7 +4514,7 @@ void row_merge_write_redo(const dict_index_t* index) byte* log_ptr = mlog_open(&mtr, 11 + 8); log_ptr = mlog_write_initial_log_record_low( MLOG_INDEX_LOAD, - index->space, index->page, log_ptr, &mtr); + index->table->space_id, index->page, log_ptr, &mtr); mach_write_to_8(log_ptr, index->id); mlog_close(&mtr, log_ptr + 8); mtr.commit(); @@ -4496,7 +4533,7 @@ old_table unless creating a PRIMARY KEY @param[in] n_indexes size of indexes[] @param[in,out] table MySQL table, for reporting erroneous key value if applicable -@param[in] add_cols default values of added columns, or NULL +@param[in] defaults default values of added, changed columns, or NULL @param[in] col_map mapping of old column numbers to new ones, or NULL if old_table == new_table @param[in] add_autoinc number of added AUTO_INCREMENT columns, or @@ -4510,6 +4547,7 @@ this function and it will be passed to other functions for further accounting. @param[in] add_v new virtual columns added along with indexes @param[in] eval_table mysql table used to evaluate virtual column value, see innobase_get_computed_value(). +@param[in] allow_not_null allow the conversion from null to not-null @return DB_SUCCESS or error code */ dberr_t row_merge_build_indexes( @@ -4521,24 +4559,26 @@ row_merge_build_indexes( const ulint* key_numbers, ulint n_indexes, struct TABLE* table, - const dtuple_t* add_cols, + const dtuple_t* defaults, const ulint* col_map, ulint add_autoinc, ib_sequence_t& sequence, bool skip_pk_sort, ut_stage_alter_t* stage, const dict_add_v_col_t* add_v, - struct TABLE* eval_table) + struct TABLE* eval_table, + bool allow_not_null) { merge_file_t* merge_files; row_merge_block_t* block; ut_new_pfx_t block_pfx; + size_t block_size; ut_new_pfx_t crypt_pfx; row_merge_block_t* crypt_block = NULL; ulint i; ulint j; dberr_t error; - int tmpfd = -1; + pfs_os_file_t tmpfd = OS_FILE_CLOSED; dict_index_t* fts_sort_idx = NULL; fts_psort_t* psort_info = NULL; fts_psort_t* merge_info = NULL; @@ -4555,7 +4595,7 @@ row_merge_build_indexes( ut_ad(!srv_read_only_mode); ut_ad((old_table == new_table) == !col_map); - ut_ad(!add_cols || col_map); + ut_ad(!defaults || col_map); stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table ? n_indexes - 1 @@ -4568,7 +4608,8 @@ row_merge_build_indexes( /* This will allocate "3 * srv_sort_buf_size" elements of type row_merge_block_t. The latter is defined as byte. */ - block = alloc.allocate_large(3 * srv_sort_buf_size, &block_pfx); + block_size = 3 * srv_sort_buf_size; + block = alloc.allocate_large(block_size, &block_pfx); if (block == NULL) { DBUG_RETURN(DB_OUT_OF_MEMORY); @@ -4579,7 +4620,7 @@ row_merge_build_indexes( if (log_tmp_is_encrypted()) { crypt_block = static_cast<row_merge_block_t*>( - alloc.allocate_large(3 * srv_sort_buf_size, + alloc.allocate_large(block_size, &crypt_pfx)); if (crypt_block == NULL) { @@ -4605,7 +4646,7 @@ row_merge_build_indexes( merge file descriptor */ for (i = 0; i < n_merge_files; i++) { - merge_files[i].fd = -1; + merge_files[i].fd = OS_FILE_CLOSED; merge_files[i].offset = 0; merge_files[i].n_rec = 0; } @@ -4635,6 +4676,7 @@ row_merge_build_indexes( created */ if (!row_fts_psort_info_init( trx, dup, new_table, opt_doc_id_size, + dict_table_page_size(old_table), &psort_info, &merge_info)) { error = DB_CORRUPTION; goto func_exit; @@ -4646,10 +4688,6 @@ row_merge_build_indexes( } } - /* Reset the MySQL row buffer that is used when reporting - duplicate keys. */ - innobase_rec_reset(table); - if (global_system_variables.log_warnings > 2) { sql_print_information("InnoDB: Online DDL : Start reading" " clustered index of the table" @@ -4676,9 +4714,9 @@ row_merge_build_indexes( error = row_merge_read_clustered_index( trx, table, old_table, new_table, online, indexes, fts_sort_idx, psort_info, merge_files, key_numbers, - n_indexes, add_cols, add_v, col_map, add_autoinc, + n_indexes, defaults, add_v, col_map, add_autoinc, sequence, block, skip_pk_sort, &tmpfd, stage, - pct_cost, crypt_block, eval_table); + pct_cost, crypt_block, eval_table, allow_not_null); stage->end_phase_read_pk(); @@ -4782,7 +4820,7 @@ wait_again: #ifdef FTS_INTERNAL_DIAG_PRINT DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n"); #endif - } else if (merge_files[k].fd >= 0) { + } else if (merge_files[k].fd != OS_FILE_CLOSED) { char buf[NAME_LEN + 1]; row_merge_dup_t dup = { sort_idx, table, col_map, 0}; @@ -4815,7 +4853,8 @@ wait_again: trx, &dup, &merge_files[k], block, &tmpfd, true, pct_progress, pct_cost, - crypt_block, new_table->space, stage); + crypt_block, new_table->space_id, + stage); pct_progress += pct_cost; @@ -4857,7 +4896,8 @@ wait_again: merge_files[k].fd, block, NULL, &btr_bulk, merge_files[k].n_rec, pct_progress, pct_cost, - crypt_block, new_table->space, stage); + crypt_block, new_table->space_id, + stage); error = btr_bulk.finish(error); @@ -4948,10 +4988,10 @@ func_exit: ut_free(merge_files); - alloc.deallocate_large(block, &block_pfx); + alloc.deallocate_large(block, &block_pfx, block_size); if (crypt_block) { - alloc.deallocate_large(crypt_block, &crypt_pfx); + alloc.deallocate_large(crypt_block, &crypt_pfx, block_size); } DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID); |