diff options
Diffstat (limited to 'storage/innobase/row/row0log.cc')
-rw-r--r-- | storage/innobase/row/row0log.cc | 861 |
1 files changed, 567 insertions, 294 deletions
diff --git a/storage/innobase/row/row0log.cc b/storage/innobase/row/row0log.cc index 5e32663ad32..df396ac1a7d 100644 --- a/storage/innobase/row/row0log.cc +++ b/storage/innobase/row/row0log.cc @@ -36,9 +36,14 @@ Created 2011-05-26 Marko Makela #include "row0ext.h" #include "data0data.h" #include "que0que.h" +#include "srv0mon.h" #include "handler0alter.h" +#include "ut0new.h" +#include "ut0stage.h" +#include "trx0rec.h" -#include<map> +#include <algorithm> +#include <map> ulint onlineddl_rowlog_rows; ulint onlineddl_rowlog_pct_used; @@ -63,22 +68,15 @@ enum row_op { ROW_OP_DELETE }; -#ifdef UNIV_DEBUG -/** Write information about the applied record to the error log */ -# define ROW_LOG_APPLY_PRINT -#endif /* UNIV_DEBUG */ - -#ifdef ROW_LOG_APPLY_PRINT -/** When set, write information about the applied record to the error log */ -static bool row_log_apply_print; -#endif /* ROW_LOG_APPLY_PRINT */ - /** Size of the modification log entry header, in bytes */ #define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/ /** Log block for modifications during online ALTER TABLE */ struct row_log_buf_t { byte* block; /*!< file block buffer */ + ut_new_pfx_t block_pfx; /*!< opaque descriptor of "block". Set + by ut_allocator::allocate_large() and fed to + ut_allocator::deallocate_large(). */ mrec_buf_t buf; /*!< buffer for accessing a record that spans two blocks */ ulint blocks; /*!< current position in blocks */ @@ -87,14 +85,13 @@ struct row_log_buf_t { the start of the row_log_table log; 0 for row_log_online_op() and row_log_apply(). */ - ulint size; /*!< allocated size of block */ }; /** Tracks BLOB allocation during online ALTER TABLE */ class row_log_table_blob_t { public: /** Constructor (declaring a BLOB freed) - @param offset_arg row_log_t::tail::total */ + @param offset_arg row_log_t::tail::total */ #ifdef UNIV_DEBUG row_log_table_blob_t(ulonglong offset_arg) : old_offset (0), free_offset (offset_arg), @@ -105,7 +102,7 @@ public: #endif /* UNIV_DEBUG */ /** Declare a BLOB freed again. - @param offset_arg row_log_t::tail::total */ + @param offset_arg row_log_t::tail::total */ #ifdef UNIV_DEBUG void blob_free(ulonglong offset_arg) #else /* UNIV_DEBUG */ @@ -119,14 +116,14 @@ public: offset = BLOB_FREED; } /** Declare a freed BLOB reused. - @param offset_arg row_log_t::tail::total */ + @param offset_arg row_log_t::tail::total */ void blob_alloc(ulonglong offset_arg) { ut_ad(free_offset <= offset_arg); ut_d(old_offset = offset); offset = offset_arg; } /** Determine if a BLOB was freed at a given log position - @param offset_arg row_log_t::head::total after the log record + @param offset_arg row_log_t::head::total after the log record @return true if freed */ bool is_freed(ulonglong offset_arg) const { /* This is supposed to be the offset at the end of the @@ -156,7 +153,12 @@ If a page number maps to 0, it is an off-page column that has been freed. If a page number maps to a nonzero number, the number is a byte offset into the index->online_log, indicating that the page is safe to access when applying log records starting from that offset. */ -typedef std::map<ulint, row_log_table_blob_t> page_no_map; +typedef std::map< + ulint, + row_log_table_blob_t, + std::less<ulint>, + ut_allocator<std::pair<const ulint, row_log_table_blob_t> > > + page_no_map; /** @brief Buffer for logging modifications during online index creation @@ -198,13 +200,16 @@ struct row_log_t { or by index->lock X-latch only */ row_log_buf_t head; /*!< reader context; protected by MDL only; modifiable by row_log_apply_ops() */ - const char* path; /*!< where to create temporary file during - log operation */ + ulint n_old_col; + /*!< number of non-virtual column in + old table */ + ulint n_old_vcol; + /*!< number of virtual column in old table */ }; /** Create the file or online log if it does not exist. -@param[in,out] log online rebuild log -@return file descriptor. */ +@param[in,out] log online rebuild log +@return true if success, false if not */ static MY_ATTRIBUTE((warn_unused_result)) int row_log_tmpfile( @@ -212,7 +217,14 @@ row_log_tmpfile( { DBUG_ENTER("row_log_tmpfile"); if (log->fd < 0) { - log->fd = row_merge_file_create_low(log->path); + log->fd = row_merge_file_create_low(); + DBUG_EXECUTE_IF("row_log_tmpfile_fail", + if (log->fd > 0) + row_merge_file_destroy_low(log->fd); + log->fd = -1;); + if (log->fd >= 0) { + MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_LOG_FILES); + } } DBUG_RETURN(log->fd); @@ -228,13 +240,15 @@ row_log_block_allocate( { DBUG_ENTER("row_log_block_allocate"); if (log_buf.block == NULL) { - log_buf.size = srv_sort_buf_size; - log_buf.block = (byte*) os_mem_alloc_large(&log_buf.size); - DBUG_EXECUTE_IF("simulate_row_log_allocation_failure", - if (log_buf.block) - os_mem_free_large(log_buf.block, log_buf.size); - log_buf.block = NULL;); - if (!log_buf.block) { + DBUG_EXECUTE_IF( + "simulate_row_log_allocation_failure", + DBUG_RETURN(false); + ); + + log_buf.block = ut_allocator<byte>(mem_key_row_log_buf) + .allocate_large(srv_sort_buf_size, &log_buf.block_pfx); + + if (log_buf.block == NULL) { DBUG_RETURN(false); } } @@ -250,7 +264,8 @@ row_log_block_free( { DBUG_ENTER("row_log_block_free"); if (log_buf.block != NULL) { - os_mem_free_large(log_buf.block, log_buf.size); + ut_allocator<byte>(mem_key_row_log_buf).deallocate_large( + log_buf.block, &log_buf.block_pfx); log_buf.block = NULL; } DBUG_VOID_RETURN; @@ -258,7 +273,6 @@ row_log_block_free( /******************************************************//** Logs an operation to a secondary index that is (or was) being created. */ -UNIV_INTERN void row_log_online_op( /*==============*/ @@ -276,10 +290,8 @@ row_log_online_op( ut_ad(dtuple_validate(tuple)); ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index)); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED) - || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_S) + || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); if (dict_index_is_corrupted(index)) { return; @@ -292,7 +304,7 @@ row_log_online_op( extra_size+1 (and reserve 0 as the end-of-chunk marker). */ size = rec_get_converted_size_temp( - index, tuple->fields, tuple->n_fields, &extra_size); + index, tuple->fields, tuple->n_fields, NULL, &extra_size); ut_ad(size >= extra_size); ut_ad(size <= sizeof log->tail.buf); @@ -340,14 +352,15 @@ row_log_online_op( } rec_convert_dtuple_to_temp( - b + extra_size, index, tuple->fields, tuple->n_fields); + b + extra_size, index, tuple->fields, tuple->n_fields, NULL); b += size; if (mrec_size >= avail_size) { + dberr_t err; + IORequest request(IORequest::WRITE); const os_offset_t byte_offset = (os_offset_t) log->tail.blocks * srv_sort_buf_size; - ibool ret; if (byte_offset + srv_sort_buf_size >= srv_online_max_size) { goto write_failed; @@ -360,6 +373,7 @@ row_log_online_op( memcpy(log->tail.block + log->tail.bytes, log->tail.buf, avail_size); } + UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size); if (row_log_tmpfile(log) < 0) { @@ -367,12 +381,13 @@ row_log_online_op( goto err_exit; } - ret = os_file_write( + err = os_file_write( + request, "(modification log)", OS_FILE_FROM_FD(log->fd), log->tail.block, byte_offset, srv_sort_buf_size); log->tail.blocks++; - if (!ret) { + if (err != DB_SUCCESS) { write_failed: /* We set the flag directly instead of invoking dict_set_corrupted_index_cache_only(index) here, @@ -396,7 +411,6 @@ err_exit: /******************************************************//** Gets the error status of the online index rebuild log. @return DB_SUCCESS or error code */ -UNIV_INTERN dberr_t row_log_table_get_error( /*====================*/ @@ -460,10 +474,11 @@ row_log_table_close_func( ut_ad(mutex_own(&log->mutex)); if (size >= avail) { + dberr_t err; + IORequest request(IORequest::WRITE); const os_offset_t byte_offset = (os_offset_t) log->tail.blocks * srv_sort_buf_size; - ibool ret; if (byte_offset + srv_sort_buf_size >= srv_online_max_size) { goto write_failed; @@ -476,6 +491,7 @@ row_log_table_close_func( memcpy(log->tail.block + log->tail.bytes, log->tail.buf, avail); } + UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size); if (row_log_tmpfile(log) < 0) { @@ -483,12 +499,13 @@ row_log_table_close_func( goto err_exit; } - ret = os_file_write( + err = os_file_write( + request, "(modification log)", OS_FILE_FROM_FD(log->fd), log->tail.block, byte_offset, srv_sort_buf_size); log->tail.blocks++; - if (!ret) { + if (err != DB_SUCCESS) { write_failed: log->error = DB_ONLINE_LOG_TOO_BIG; } @@ -521,12 +538,12 @@ err_exit: /******************************************************//** Logs a delete operation to a table that is being rebuilt. This will be merged in row_log_table_apply_delete(). */ -UNIV_INTERN void row_log_table_delete( /*=================*/ const rec_t* rec, /*!< in: clustered index leaf page record, page X-latched */ + const dtuple_t* ventry, /*!< in: dtuple holding virtual column info */ dict_index_t* index, /*!< in/out: clustered index, S-latched or X-latched */ const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */ @@ -546,10 +563,9 @@ row_log_table_delete( ut_ad(rec_offs_validate(rec, index, offsets)); ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index)); ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED) - || rw_lock_own(&index->lock, RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); if (dict_index_is_corrupted(index) || !dict_index_is_online_ddl(index) @@ -617,7 +633,7 @@ row_log_table_delete( ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( old_pk, old_pk->n_fields - 1)->len); old_pk_size = rec_get_converted_size_temp( - new_index, old_pk->fields, old_pk->n_fields, + new_index, old_pk->fields, old_pk->n_fields, NULL, &old_pk_extra_size); ut_ad(old_pk_extra_size < 0x100); @@ -645,6 +661,13 @@ row_log_table_delete( } } + /* Check if we need to log virtual column data */ + if (ventry->n_v_fields > 0) { + ulint v_extra; + mrec_size += rec_get_converted_size_temp( + index, NULL, 0, ventry, &v_extra); + } + if (byte* b = row_log_table_open(index->online_log, mrec_size, &avail_size)) { *b++ = ROW_T_DELETE; @@ -656,7 +679,7 @@ row_log_table_delete( rec_convert_dtuple_to_temp( b + old_pk_extra_size, new_index, - old_pk->fields, old_pk->n_fields); + old_pk->fields, old_pk->n_fields, NULL); b += old_pk_size; @@ -687,6 +710,13 @@ row_log_table_delete( b += ext_size; } + /* log virtual columns */ + if (ventry->n_v_fields > 0) { + rec_convert_dtuple_to_temp( + b, new_index, NULL, 0, ventry); + b += mach_read_from_2(b); + } + row_log_table_close( index->online_log, b, mrec_size, avail_size); } @@ -704,6 +734,10 @@ row_log_table_low_redundant( const rec_t* rec, /*!< in: clustered index leaf page record in ROW_FORMAT=REDUNDANT, page X-latched */ + const dtuple_t* ventry, /*!< in: dtuple holding virtual + column info or NULL */ + const dtuple_t* o_ventry,/*!< in: old dtuple holding virtual + column info or NULL */ dict_index_t* index, /*!< in/out: clustered index, S-latched or X-latched */ bool insert, /*!< in: true if insert, @@ -723,16 +757,22 @@ row_log_table_low_redundant( ulint avail_size; mem_heap_t* heap = NULL; dtuple_t* tuple; + ulint num_v = ventry ? dtuple_get_n_v_fields(ventry) : 0; ut_ad(!page_is_comp(page_align(rec))); ut_ad(dict_index_get_n_fields(index) == rec_get_n_fields_old(rec)); - ut_ad(dict_tf_is_valid(index->table->flags)); + ut_ad(dict_tf2_is_valid(index->table->flags, index->table->flags2)); ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */ ut_ad(dict_index_is_clust(new_index)); heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields)); - tuple = dtuple_create(heap, index->n_fields); + tuple = dtuple_create_with_vcol(heap, index->n_fields, num_v); dict_index_copy_types(tuple, index, index->n_fields); + + if (num_v) { + dict_table_copy_v_types(tuple, index->table); + } + dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index)); if (rec_get_1byte_offs_flag(rec)) { @@ -764,10 +804,23 @@ row_log_table_low_redundant( } size = rec_get_converted_size_temp( - index, tuple->fields, tuple->n_fields, &extra_size); + index, tuple->fields, tuple->n_fields, ventry, &extra_size); mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80); + if (ventry && ventry->n_v_fields > 0) { + ulint v_extra = 0; + mrec_size += rec_get_converted_size_temp( + index, NULL, 0, ventry, &v_extra); + + if (o_ventry) { + mrec_size += rec_get_converted_size_temp( + index, NULL, 0, ventry, &v_extra); + } + } else if (index->table->n_v_cols) { + mrec_size += 2; + } + if (insert || index->online_log->same_pk) { ut_ad(!old_pk); old_pk_extra_size = old_pk_size = 0; @@ -781,7 +834,7 @@ row_log_table_low_redundant( old_pk_size = rec_get_converted_size_temp( new_index, old_pk->fields, old_pk->n_fields, - &old_pk_extra_size); + ventry, &old_pk_extra_size); ut_ad(old_pk_extra_size < 0x100); mrec_size += 1/*old_pk_extra_size*/ + old_pk_size; } @@ -795,7 +848,8 @@ row_log_table_low_redundant( rec_convert_dtuple_to_temp( b + old_pk_extra_size, new_index, - old_pk->fields, old_pk->n_fields); + old_pk->fields, old_pk->n_fields, + ventry); b += old_pk_size; } @@ -808,9 +862,28 @@ row_log_table_low_redundant( } rec_convert_dtuple_to_temp( - b + extra_size, index, tuple->fields, tuple->n_fields); + b + extra_size, index, tuple->fields, tuple->n_fields, + ventry); b += size; + if (ventry && ventry->n_v_fields > 0) { + rec_convert_dtuple_to_temp( + b, new_index, NULL, 0, ventry); + b += mach_read_from_2(b); + + if (o_ventry) { + rec_convert_dtuple_to_temp( + b, new_index, NULL, 0, o_ventry); + b += mach_read_from_2(b); + } + } else if (index->table->n_v_cols) { + /* The table contains virtual columns, but nothing + has changed for them, so just mark a 2 bytes length + field */ + mach_write_to_2(b, 2); + b += 2; + } + row_log_table_close( index->online_log, b, mrec_size, avail_size); } @@ -820,12 +893,15 @@ row_log_table_low_redundant( /******************************************************//** Logs an insert or update to a table that is being rebuilt. */ -static MY_ATTRIBUTE((nonnull(1,2,3))) +static MY_ATTRIBUTE((nonnull(1,2,4))) void row_log_table_low( /*==============*/ const rec_t* rec, /*!< in: clustered index leaf page record, page X-latched */ + const dtuple_t* ventry, /*!< in: dtuple holding virtual column info */ + const dtuple_t* o_ventry,/*!< in: dtuple holding old virtual column + info */ dict_index_t* index, /*!< in/out: clustered index, S-latched or X-latched */ const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */ @@ -839,18 +915,19 @@ row_log_table_low( ulint extra_size; ulint mrec_size; ulint avail_size; - const dict_index_t* new_index = dict_table_get_first_index( - index->online_log->table); + const dict_index_t* new_index; + + new_index = dict_table_get_first_index(index->online_log->table); + ut_ad(dict_index_is_clust(index)); ut_ad(dict_index_is_clust(new_index)); ut_ad(!dict_index_is_online_ddl(new_index)); ut_ad(rec_offs_validate(rec, index, offsets)); ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index)); ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED) - || rw_lock_own(&index->lock, RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); ut_ad(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX); ut_ad(page_is_leaf(page_align(rec))); ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets)); @@ -863,7 +940,8 @@ row_log_table_low( if (!rec_offs_comp(offsets)) { row_log_table_low_redundant( - rec, index, insert, old_pk, new_index); + rec, ventry, o_ventry, index, insert, + old_pk, new_index); return; } @@ -877,6 +955,22 @@ row_log_table_low( mrec_size = ROW_LOG_HEADER_SIZE + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size; + if (ventry && ventry->n_v_fields > 0) { + ulint v_extra = 0; + mrec_size += rec_get_converted_size_temp( + index, NULL, 0, ventry, &v_extra); + + if (o_ventry) { + mrec_size += rec_get_converted_size_temp( + index, NULL, 0, ventry, &v_extra); + } + } else if (index->table->n_v_cols) { + /* Always leave 2 bytes length marker for virtual column + data logging even if there is none of them is indexed if table + has virtual columns */ + mrec_size += 2; + } + if (insert || index->online_log->same_pk) { ut_ad(!old_pk); old_pk_extra_size = old_pk_size = 0; @@ -890,7 +984,7 @@ row_log_table_low( old_pk_size = rec_get_converted_size_temp( new_index, old_pk->fields, old_pk->n_fields, - &old_pk_extra_size); + old_pk, &old_pk_extra_size); ut_ad(old_pk_extra_size < 0x100); mrec_size += 1/*old_pk_extra_size*/ + old_pk_size; } @@ -904,7 +998,8 @@ row_log_table_low( rec_convert_dtuple_to_temp( b + old_pk_extra_size, new_index, - old_pk->fields, old_pk->n_fields); + old_pk->fields, old_pk->n_fields, + NULL); b += old_pk_size; } @@ -921,6 +1016,24 @@ row_log_table_low( memcpy(b, rec, rec_offs_data_size(offsets)); b += rec_offs_data_size(offsets); + if (ventry && ventry->n_v_fields > 0) { + rec_convert_dtuple_to_temp( + b, new_index, NULL, 0, ventry); + b += mach_read_from_2(b); + + if (o_ventry) { + rec_convert_dtuple_to_temp( + b, new_index, NULL, 0, o_ventry); + b += mach_read_from_2(b); + } + } else if (index->table->n_v_cols) { + /* The table contains virtual columns, but nothing + has changed for them, so just mark a 2 bytes length + field */ + mach_write_to_2(b, 2); + b += 2; + } + row_log_table_close( index->online_log, b, mrec_size, avail_size); } @@ -929,7 +1042,6 @@ row_log_table_low( /******************************************************//** Logs an update to a table that is being rebuilt. This will be merged in row_log_table_apply_update(). */ -UNIV_INTERN void row_log_table_update( /*=================*/ @@ -938,16 +1050,21 @@ row_log_table_update( dict_index_t* index, /*!< in/out: clustered index, S-latched or X-latched */ const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */ - const dtuple_t* old_pk) /*!< in: row_log_table_get_pk() + const dtuple_t* old_pk, /*!< in: row_log_table_get_pk() before the update */ + const dtuple_t* new_v_row,/*!< in: dtuple contains the new virtual + columns */ + const dtuple_t* old_v_row)/*!< in: dtuple contains the old virtual + columns */ { - row_log_table_low(rec, index, offsets, false, old_pk); + row_log_table_low(rec, new_v_row, old_v_row, index, offsets, + false, old_pk); } /** Gets the old table column of a PRIMARY KEY column. -@param table old table (before ALTER TABLE) -@param col_map mapping of old column numbers to new ones -@param col_no column position in the new table +@param table old table (before ALTER TABLE) +@param col_map mapping of old column numbers to new ones +@param col_no column position in the new table @return old table column, or NULL if this is an added column */ static const dict_col_t* @@ -967,21 +1084,22 @@ row_log_table_get_pk_old_col( } /** Maps an old table column of a PRIMARY KEY column. -@param col old table column (before ALTER TABLE) -@param ifield clustered index field in the new table (after ALTER TABLE) -@param dfield clustered index tuple field in the new table -@param heap memory heap for allocating dfield contents -@param rec clustered index leaf page record in the old table -@param offsets rec_get_offsets(rec) -@param i rec field corresponding to col -@param zip_size compressed page size of the old table, or 0 for uncompressed -@param max_len maximum length of dfield -@retval DB_INVALID_NULL if a NULL value is encountered -@retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */ +@param[in] col old table column (before ALTER TABLE) +@param[in] ifield clustered index field in the new table (after +ALTER TABLE) +@param[in,out] dfield clustered index tuple field in the new table +@param[in,out] heap memory heap for allocating dfield contents +@param[in] rec clustered index leaf page record in the old +table +@param[in] offsets rec_get_offsets(rec) +@param[in] i rec field corresponding to col +@param[in] page_size page size of the old table +@param[in] max_len maximum length of dfield +@retval DB_INVALID_NULL if a NULL value is encountered +@retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */ static dberr_t row_log_table_get_pk_col( -/*=====================*/ const dict_col_t* col, const dict_field_t* ifield, dfield_t* dfield, @@ -989,14 +1107,12 @@ row_log_table_get_pk_col( const rec_t* rec, const ulint* offsets, ulint i, - ulint zip_size, + const page_size_t& page_size, ulint max_len) { const byte* field; ulint len; - ut_ad(ut_is_2pow(zip_size)); - field = rec_get_nth_field(rec, offsets, i, &len); if (len == UNIV_SQL_NULL) { @@ -1018,7 +1134,7 @@ row_log_table_get_pk_col( mem_heap_alloc(heap, field_len)); len = btr_copy_externally_stored_field_prefix( - blob_field, field_len, zip_size, field, len, NULL); + blob_field, field_len, page_size, field, len); if (len >= max_len + 1) { return(DB_TOO_BIG_INDEX_COL); } @@ -1036,7 +1152,6 @@ Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR of a table that is being rebuilt. @return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table, or NULL if the PRIMARY KEY definition does not change */ -UNIV_INTERN const dtuple_t* row_log_table_get_pk( /*=================*/ @@ -1055,10 +1170,9 @@ row_log_table_get_pk( ut_ad(dict_index_is_clust(index)); ut_ad(dict_index_is_online_ddl(index)); ut_ad(!offsets || rec_offs_validate(rec, index, offsets)); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED) - || rw_lock_own(&index->lock, RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); ut_ad(log); ut_ad(log->table); @@ -1131,7 +1245,9 @@ row_log_table_get_pk( dtuple_set_n_fields_cmp(tuple, new_n_uniq); const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table); - const ulint zip_size = dict_table_zip_size(index->table); + + const page_size_t& page_size + = dict_table_page_size(index->table); for (ulint new_i = 0; new_i < new_n_uniq; new_i++) { dict_field_t* ifield; @@ -1158,7 +1274,7 @@ row_log_table_get_pk( log->error = row_log_table_get_pk_col( col, ifield, dfield, *heap, - rec, offsets, i, zip_size, max_len); + rec, offsets, i, page_size, max_len); if (log->error != DB_SUCCESS) { err_exit: @@ -1227,22 +1343,21 @@ func_exit: /******************************************************//** Logs an insert to a table that is being rebuilt. This will be merged in row_log_table_apply_insert(). */ -UNIV_INTERN void row_log_table_insert( /*=================*/ const rec_t* rec, /*!< in: clustered index leaf page record, page X-latched */ + const dtuple_t* ventry, /*!< in: dtuple holding virtual column info */ dict_index_t* index, /*!< in/out: clustered index, S-latched or X-latched */ const ulint* offsets)/*!< in: rec_get_offsets(rec,index) */ { - row_log_table_low(rec, index, offsets, true, NULL); + row_log_table_low(rec, ventry, NULL, index, offsets, true, NULL); } /******************************************************//** Notes that a BLOB is being freed during online ALTER TABLE. */ -UNIV_INTERN void row_log_table_blob_free( /*====================*/ @@ -1251,9 +1366,9 @@ row_log_table_blob_free( { ut_ad(dict_index_is_clust(index)); ut_ad(dict_index_is_online_ddl(index)); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); ut_ad(page_no != FIL_NULL); if (index->online_log->error != DB_SUCCESS) { @@ -1262,8 +1377,8 @@ row_log_table_blob_free( page_no_map* blobs = index->online_log->blobs; - if (!blobs) { - index->online_log->blobs = blobs = new page_no_map(); + if (blobs == NULL) { + index->online_log->blobs = blobs = UT_NEW_NOKEY(page_no_map()); } #ifdef UNIV_DEBUG @@ -1287,7 +1402,6 @@ row_log_table_blob_free( /******************************************************//** Notes that a BLOB is being allocated during online ALTER TABLE. */ -UNIV_INTERN void row_log_table_blob_alloc( /*=====================*/ @@ -1296,9 +1410,11 @@ row_log_table_blob_alloc( { ut_ad(dict_index_is_clust(index)); ut_ad(dict_index_is_online_ddl(index)); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + + ut_ad(rw_lock_own_flagged( + &index->lock, + RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)); + ut_ad(page_no != FIL_NULL); if (index->online_log->error != DB_SUCCESS) { @@ -1335,6 +1451,7 @@ row_log_table_apply_convert_mrec( reason of failure */ { dtuple_t* row; + ulint num_v = dict_table_get_n_v_cols(log->table); *error = DB_SUCCESS; @@ -1348,7 +1465,8 @@ row_log_table_apply_convert_mrec( dfield_get_type(dtuple_get_nth_field(row, i))); } } else { - row = dtuple_create(heap, dict_table_get_n_cols(log->table)); + row = dtuple_create_with_vcol( + heap, dict_table_get_n_cols(log->table), num_v); dict_table_copy_types(row, log->table); } @@ -1368,6 +1486,7 @@ row_log_table_apply_convert_mrec( const dict_col_t* col = dict_field_get_col(ind_field); + ulint col_no = log->col_map[dict_col_get_no(col)]; @@ -1376,8 +1495,9 @@ row_log_table_apply_convert_mrec( continue; } - dfield_t* dfield + dfield_t* dfield = dtuple_get_nth_field(row, col_no); + ulint len; const byte* data; @@ -1408,8 +1528,8 @@ row_log_table_apply_convert_mrec( data = btr_rec_copy_externally_stored_field( mrec, offsets, - dict_table_zip_size(index->table), - i, &len, heap, NULL); + dict_table_page_size(index->table), + i, &len, heap); ut_a(data); dfield_set_data(dfield, data, len); blob_done: @@ -1468,6 +1588,14 @@ blob_done: dfield_get_type(dfield))); } + /* read the virtual column data if any */ + if (num_v) { + byte* b = const_cast<byte*>(mrec) + + rec_offs_data_size(offsets); + trx_undo_read_v_cols(log->table, b, row, false, + &(log->col_map[log->n_old_col])); + } + return(row); } @@ -1497,14 +1625,15 @@ row_log_table_apply_insert_low( ut_ad(dtuple_validate(row)); ut_ad(trx_id); -#ifdef ROW_LOG_APPLY_PRINT - if (row_log_apply_print) { - fprintf(stderr, "table apply insert " - IB_ID_FMT " " IB_ID_FMT "\n", - index->table->id, index->id); - dtuple_print(stderr, row); +#ifdef UNIV_DEBUG + { + rec_printer p(row); + DBUG_PRINT("ib_alter_table", + ("insert table %llu (index %llu): %s", + index->table->id, index->id, + p.str().c_str())); } -#endif /* ROW_LOG_APPLY_PRINT */ +#endif static const ulint flags = (BTR_CREATE_FLAG @@ -1515,7 +1644,8 @@ row_log_table_apply_insert_low( entry = row_build_index_entry(row, NULL, index, heap); error = row_ins_clust_index_entry_low( - flags, BTR_MODIFY_TREE, index, index->n_uniq, entry, 0, thr); + flags, BTR_MODIFY_TREE, index, index->n_uniq, + entry, 0, thr, false); switch (error) { case DB_SUCCESS: @@ -1541,8 +1671,8 @@ row_log_table_apply_insert_low( entry = row_build_index_entry(row, NULL, index, heap); error = row_ins_sec_index_entry_low( flags, BTR_MODIFY_TREE, - index, offsets_heap, heap, entry, trx_id, thr); - + index, offsets_heap, heap, entry, trx_id, thr, + false); /* Report correct index name for duplicate key error. */ if (error == DB_DUPLICATE_KEY) { thr_get_trx(thr)->error_key_num = n_index; @@ -1608,12 +1738,14 @@ row_log_table_apply_insert( /******************************************************//** Deletes a record from a table that is being rebuilt. @return DB_SUCCESS or error code */ -static MY_ATTRIBUTE((nonnull(1, 2, 4, 5), warn_unused_result)) +static MY_ATTRIBUTE((nonnull(1, 2, 5), warn_unused_result)) dberr_t row_log_table_apply_delete_low( /*===========================*/ btr_pcur_t* pcur, /*!< in/out: B-tree cursor, will be trashed */ + const dtuple_t* ventry, /*!< in: dtuple holding + virtual column info */ const ulint* offsets, /*!< in: offsets on pcur */ const row_ext_t* save_ext, /*!< in: saved external field info, or NULL */ @@ -1628,20 +1760,26 @@ row_log_table_apply_delete_low( ut_ad(dict_index_is_clust(index)); -#ifdef ROW_LOG_APPLY_PRINT - if (row_log_apply_print) { - fprintf(stderr, "table apply delete " - IB_ID_FMT " " IB_ID_FMT "\n", - index->table->id, index->id); - rec_print_new(stderr, btr_pcur_get_rec(pcur), offsets); +#ifdef UNIV_DEBUG + { + rec_printer p(btr_pcur_get_rec(pcur), offsets); + DBUG_PRINT("ib_alter_table", + ("delete table %llu (index %llu): %s", + index->table->id, index->id, + p.str().c_str())); } -#endif /* ROW_LOG_APPLY_PRINT */ +#endif + if (dict_table_get_next_index(index)) { /* Build a row template for purging secondary index entries. */ row = row_build( ROW_COPY_DATA, index, btr_pcur_get_rec(pcur), offsets, NULL, NULL, NULL, save_ext ? NULL : &ext, heap); + if (ventry) { + dtuple_copy_v_fields(row, ventry); + } + if (!save_ext) { save_ext = ext; } @@ -1650,7 +1788,7 @@ row_log_table_apply_delete_low( } btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur), - BTR_CREATE_FLAG, RB_NONE, mtr); + BTR_CREATE_FLAG, false, mtr); mtr_commit(mtr); if (error != DB_SUCCESS) { @@ -1665,8 +1803,10 @@ row_log_table_apply_delete_low( const dtuple_t* entry = row_build_index_entry( row, save_ext, index, heap); mtr_start(mtr); + mtr->set_named_space(index->space); btr_pcur_open(index, entry, PAGE_CUR_LE, - BTR_MODIFY_TREE, pcur, mtr); + BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE, + pcur, mtr); #ifdef UNIV_DEBUG switch (btr_pcur_get_btr_cur(pcur)->flag) { case BTR_CUR_DELETE_REF: @@ -1696,7 +1836,7 @@ flag_ok: btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur), - BTR_CREATE_FLAG, RB_NONE, mtr); + BTR_CREATE_FLAG, false, mtr); mtr_commit(mtr); } @@ -1720,8 +1860,9 @@ row_log_table_apply_delete( that can be emptied */ mem_heap_t* heap, /*!< in/out: memory heap */ const row_log_t* log, /*!< in: online log */ - const row_ext_t* save_ext) /*!< in: saved external field + const row_ext_t* save_ext, /*!< in: saved external field info, or NULL */ + ulint ext_size) /*!< in: external field size */ { dict_table_t* new_table = log->table; dict_index_t* index = dict_table_get_first_index(new_table); @@ -1729,15 +1870,20 @@ row_log_table_apply_delete( mtr_t mtr; btr_pcur_t pcur; ulint* offsets; + ulint num_v = new_table->n_v_cols; ut_ad(rec_offs_n_fields(moffsets) == dict_index_get_n_unique(index) + 2); ut_ad(!rec_offs_any_extern(moffsets)); /* Convert the row to a search tuple. */ - old_pk = dtuple_create(heap, index->n_uniq); + old_pk = dtuple_create_with_vcol(heap, index->n_uniq, num_v); dict_index_copy_types(old_pk, index, index->n_uniq); + if (num_v) { + dict_table_copy_v_types(old_pk, index->table); + } + for (ulint i = 0; i < index->n_uniq; i++) { ulint len; const void* field; @@ -1748,8 +1894,10 @@ row_log_table_apply_delete( } mtr_start(&mtr); + mtr.set_named_space(index->space); btr_pcur_open(index, old_pk, PAGE_CUR_LE, - BTR_MODIFY_TREE, &pcur, &mtr); + BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE, + &pcur, &mtr); #ifdef UNIV_DEBUG switch (btr_pcur_get_btr_cur(&pcur)->flag) { case BTR_CUR_DELETE_REF: @@ -1821,7 +1969,15 @@ all_done: } } - return(row_log_table_apply_delete_low(&pcur, offsets, save_ext, + if (num_v) { + byte* b = (byte*)mrec + rec_offs_data_size(moffsets) + + ext_size; + trx_undo_read_v_cols(log->table, b, old_pk, false, + &(log->col_map[log->n_old_col])); + } + + return(row_log_table_apply_delete_low(&pcur, old_pk, + offsets, save_ext, heap, &mtr)); } @@ -1891,6 +2047,7 @@ row_log_table_apply_update( } mtr_start(&mtr); + mtr.set_named_space(index->space); btr_pcur_open(index, old_pk, PAGE_CUR_LE, BTR_MODIFY_TREE, &pcur, &mtr); #ifdef UNIV_DEBUG @@ -1961,7 +2118,7 @@ func_exit: mtr_commit(&mtr); } func_exit_committed: - ut_ad(mtr.state == MTR_COMMITTED); + ut_ad(mtr.has_committed()); if (error != DB_SUCCESS) { /* Report the erroneous row using the new @@ -2032,13 +2189,13 @@ func_exit_committed: /* Some BLOBs are missing, so we are interpreting this ROW_T_UPDATE as ROW_T_DELETE (see *1). */ error = row_log_table_apply_delete_low( - &pcur, cur_offsets, NULL, heap, &mtr); + &pcur, old_pk, cur_offsets, NULL, heap, &mtr); goto func_exit_committed; } dtuple_t* entry = row_build_index_entry( row, NULL, index, heap); - const upd_t* update = row_upd_build_difference_binary( + upd_t* update = row_upd_build_difference_binary( index, entry, btr_pcur_get_rec(&pcur), cur_offsets, false, NULL, heap); @@ -2070,8 +2227,8 @@ func_exit_committed: } error = row_log_table_apply_delete_low( - &pcur, cur_offsets, NULL, heap, &mtr); - ut_ad(mtr.state == MTR_COMMITTED); + &pcur, old_pk, cur_offsets, NULL, heap, &mtr); + ut_ad(mtr.has_committed()); if (error == DB_SUCCESS) { error = row_log_table_apply_insert_low( @@ -2091,15 +2248,18 @@ func_exit_committed: ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur), cur_offsets, NULL, NULL, NULL, &old_ext, heap); ut_ad(old_row); -#ifdef ROW_LOG_APPLY_PRINT - if (row_log_apply_print) { - fprintf(stderr, "table apply update " - IB_ID_FMT " " IB_ID_FMT "\n", - index->table->id, index->id); - dtuple_print(stderr, old_row); - dtuple_print(stderr, row); - } -#endif /* ROW_LOG_APPLY_PRINT */ + +#ifdef UNIV_DEBUG + { + rec_printer old(old_row); + rec_printer new_row(row); + DBUG_PRINT("ib_alter_table", + ("update table %llu (index %llu): %s to %s", + index->table->id, index->id, + old.str().c_str(), + new_row.str().c_str())); + } +#endif } else { old_row = NULL; old_ext = NULL; @@ -2118,9 +2278,8 @@ func_exit_committed: if (big_rec) { if (error == DB_SUCCESS) { error = btr_store_big_rec_extern_fields( - index, btr_pcur_get_block(&pcur), - btr_pcur_get_rec(&pcur), cur_offsets, - big_rec, &mtr, BTR_STORE_UPDATE); + &pcur, update, cur_offsets, big_rec, &mtr, + BTR_STORE_UPDATE); } dtuple_big_rec_free(big_rec); @@ -2142,6 +2301,10 @@ func_exit_committed: continue; } + if (dict_index_has_virtual(index)) { + dtuple_copy_v_fields(old_row, old_pk); + } + mtr_commit(&mtr); entry = row_build_index_entry(old_row, old_ext, index, heap); @@ -2151,6 +2314,7 @@ func_exit_committed: } mtr_start(&mtr); + mtr.set_named_space(index->space); if (ROW_FOUND != row_search_index_entry( index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) { @@ -2161,7 +2325,7 @@ func_exit_committed: btr_cur_pessimistic_delete( &error, FALSE, btr_pcur_get_btr_cur(&pcur), - BTR_CREATE_FLAG, RB_NONE, &mtr); + BTR_CREATE_FLAG, false, &mtr); if (error != DB_SUCCESS) { break; @@ -2174,7 +2338,7 @@ func_exit_committed: BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG, BTR_MODIFY_TREE, index, offsets_heap, heap, - entry, trx_id, thr); + entry, trx_id, thr, false); /* Report correct index name for duplicate key error. */ if (error == DB_DUPLICATE_KEY) { @@ -2182,6 +2346,7 @@ func_exit_committed: } mtr_start(&mtr); + mtr.set_named_space(index->space); } goto func_exit; @@ -2259,6 +2424,10 @@ row_log_table_apply_op( next_mrec = mrec + rec_offs_data_size(offsets); + if (log->table->n_v_cols) { + next_mrec += mach_read_from_2(next_mrec); + } + if (next_mrec > mrec_end) { return(NULL); } else { @@ -2293,6 +2462,10 @@ row_log_table_apply_op( rec_offs_set_n_fields(offsets, new_index->n_uniq + 2); rec_init_offsets_temp(mrec, new_index, offsets); next_mrec = mrec + rec_offs_data_size(offsets) + ext_size; + if (log->table->n_v_cols) { + next_mrec += mach_read_from_2(next_mrec); + } + if (next_mrec > mrec_end) { return(NULL); } @@ -2325,7 +2498,7 @@ row_log_table_apply_op( *error = row_log_table_apply_delete( thr, new_trx_id_col, mrec, offsets, offsets_heap, heap, - log, ext); + log, ext, ext_size); break; case ROW_T_UPDATE: @@ -2336,6 +2509,7 @@ row_log_table_apply_op( definition of the columns belonging to PRIMARY KEY is not changed, the log will only contain DB_TRX_ID,new_row. */ + ulint num_v = new_index->table->n_v_cols; if (dup->index->online_log->same_pk) { ut_ad(new_index->n_uniq == dup->index->n_uniq); @@ -2364,9 +2538,14 @@ row_log_table_apply_op( return(NULL); } - old_pk = dtuple_create(heap, new_index->n_uniq); + old_pk = dtuple_create_with_vcol( + heap, new_index->n_uniq, num_v); dict_index_copy_types( old_pk, new_index, old_pk->n_fields); + if (num_v) { + dict_table_copy_v_types( + old_pk, new_index->table); + } /* Copy the PRIMARY KEY fields from mrec to old_pk. */ for (ulint i = 0; i < new_index->n_uniq; i++) { @@ -2404,10 +2583,16 @@ row_log_table_apply_op( /* Copy the PRIMARY KEY fields and DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */ - old_pk = dtuple_create(heap, new_index->n_uniq + 2); + old_pk = dtuple_create_with_vcol( + heap, new_index->n_uniq + 2, num_v); dict_index_copy_types(old_pk, new_index, old_pk->n_fields); + if (num_v) { + dict_table_copy_v_types( + old_pk, new_index->table); + } + for (ulint i = 0; i < dict_index_get_n_unique(new_index) + 2; i++) { @@ -2454,6 +2639,31 @@ row_log_table_apply_op( } } + /* Read virtual column info from log */ + if (num_v) { + ulint o_v_size = 0; + ulint n_v_size = 0; + n_v_size = mach_read_from_2(next_mrec); + next_mrec += n_v_size; + if (next_mrec > mrec_end) { + return(NULL); + } + + /* if there is more than 2 bytes length info */ + if (n_v_size > 2) { + trx_undo_read_v_cols( + log->table, const_cast<byte*>( + next_mrec), old_pk, false, + &(log->col_map[log->n_old_col])); + o_v_size = mach_read_from_2(next_mrec); + } + + next_mrec += o_v_size; + if (next_mrec > mrec_end) { + return(NULL); + } + } + ut_ad(next_mrec <= mrec_end); log->head.total += next_mrec - mrec_start; dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq); @@ -2479,16 +2689,74 @@ row_log_table_apply_op( return(next_mrec); } -/******************************************************//** -Applies operations to a table was rebuilt. +#ifdef HAVE_PSI_STAGE_INTERFACE +/** Estimate how much an ALTER TABLE progress should be incremented per +one block of log applied. +For the other phases of ALTER TABLE we increment the progress with 1 per +page processed. +@return amount of abstract units to add to work_completed when one block +of log is applied. +*/ +inline +ulint +row_log_progress_inc_per_block() +{ + /* We must increment the progress once per page (as in + univ_page_size, usually 16KiB). One block here is srv_sort_buf_size + (usually 1MiB). */ + const ulint pages_per_block = std::max( + static_cast<unsigned long>( + srv_sort_buf_size / univ_page_size.physical()), + 1UL); + + /* Multiply by an artificial factor of 6 to even the pace with + the rest of the ALTER TABLE phases, they process page_size amount + of data faster. */ + return(pages_per_block * 6); +} + +/** Estimate how much work is to be done by the log apply phase +of an ALTER TABLE for this index. +@param[in] index index whose log to assess +@return work to be done by log-apply in abstract units +*/ +ulint +row_log_estimate_work( + const dict_index_t* index) +{ + if (index == NULL || index->online_log == NULL) { + return(0); + } + + const row_log_t* l = index->online_log; + const ulint bytes_left = + static_cast<ulint>(l->tail.total - l->head.total); + const ulint blocks_left = bytes_left / srv_sort_buf_size; + + return(blocks_left * row_log_progress_inc_per_block()); +} +#else /* HAVE_PSI_STAGE_INTERFACE */ +inline +ulint +row_log_progress_inc_per_block() +{ + return(0); +} +#endif /* HAVE_PSI_STAGE_INTERFACE */ + +/** Applies operations to a table was rebuilt. +@param[in] thr query graph +@param[in,out] dup for reporting duplicate key errors +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL, then stage->inc() will be called for each block +of log that is applied. @return DB_SUCCESS, or error code on failure */ static MY_ATTRIBUTE((nonnull, warn_unused_result)) dberr_t row_log_table_apply_ops( -/*====================*/ - que_thr_t* thr, /*!< in: query graph */ - row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key - errors */ + que_thr_t* thr, + row_merge_dup_t* dup, + ut_stage_alter_t* stage) { dberr_t error; const mrec_t* mrec = NULL; @@ -2516,9 +2784,7 @@ row_log_table_apply_ops( ut_ad(dict_index_is_clust(index)); ut_ad(dict_index_is_online_ddl(index)); ut_ad(trx->mysql_thd); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); ut_ad(!dict_index_is_online_ddl(new_index)); ut_ad(trx_id_col > 0); ut_ad(trx_id_col != ULINT_UNDEFINED); @@ -2527,7 +2793,7 @@ row_log_table_apply_ops( UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end); - offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets)); + offsets = static_cast<ulint*>(ut_malloc_nokey(i * sizeof *offsets)); offsets[0] = i; offsets[1] = dict_index_get_n_fields(index); @@ -2537,11 +2803,11 @@ row_log_table_apply_ops( next_block: ut_ad(has_index_lock); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); ut_ad(index->online_log->head.bytes == 0); + stage->inc(row_log_progress_inc_per_block()); + if (trx_is_interrupted(trx)) { goto interrupted; } @@ -2562,8 +2828,8 @@ next_block: if (UNIV_UNLIKELY(index->online_log->head.blocks > index->online_log->tail.blocks)) { unexpected_eof: - fprintf(stderr, "InnoDB: unexpected end of temporary file" - " for table %s\n", index->table_name); + ib::error() << "Unexpected end of temporary file for table " + << index->table->name; corruption: error = DB_CORRUPTION; goto func_exit; @@ -2574,11 +2840,13 @@ corruption: if (index->online_log->head.blocks) { #ifdef HAVE_FTRUNCATE /* Truncate the file in order to save space. */ - if (index->online_log->fd != -1 + if (index->online_log->fd > 0 && ftruncate(index->online_log->fd, 0) == -1) { - fprintf(stderr, "InnoDB: Error: Truncate of file " - "\'%s\' failed with error %d:%s\n", - index->name + 1, errno, strerror(errno)); + ib::error() + << "\'" << index->name + 1 + << "\' failed with error " + << errno << ":" << strerror(errno); + goto corruption; } #endif /* HAVE_FTRUNCATE */ @@ -2602,7 +2870,6 @@ all_done: } } else { os_offset_t ofs; - ibool success; ofs = (os_offset_t) index->online_log->head.blocks * srv_sort_buf_size; @@ -2620,14 +2887,19 @@ all_done: goto func_exit; } - success = os_file_read_no_error_handling( + IORequest request; + + dberr_t err = os_file_read_no_error_handling( + request, OS_FILE_FROM_FD(index->online_log->fd), index->online_log->head.block, ofs, - srv_sort_buf_size); + srv_sort_buf_size, + NULL); - if (!success) { - fprintf(stderr, "InnoDB: unable to read temporary file" - " for table %s\n", index->table_name); + if (err != DB_SUCCESS) { + ib::error() + << "Unable to read temporary file" + " for table " << index->table_name; goto corruption; } @@ -2636,14 +2908,6 @@ all_done: posix_fadvise(index->online_log->fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); #endif /* POSIX_FADV_DONTNEED */ -#if 0 //def FALLOC_FL_PUNCH_HOLE - /* Try to deallocate the space for the file on disk. - This should work on ext4 on Linux 2.6.39 and later, - and be ignored when the operation is unsupported. */ - fallocate(index->online_log->fd, - FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, - ofs, srv_sort_buf_size); -#endif /* FALLOC_FL_PUNCH_HOLE */ next_mrec = index->online_log->head.block; next_mrec_end = next_mrec + srv_sort_buf_size; @@ -2828,27 +3092,31 @@ func_exit: return(error); } -/******************************************************//** -Apply the row_log_table log to a table upon completing rebuild. +/** Apply the row_log_table log to a table upon completing rebuild. +@param[in] thr query graph +@param[in] old_table old table +@param[in,out] table MySQL table (for reporting duplicates) +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. stage->begin_phase_log_table() will be called initially and then +stage->inc() will be called for each block of log that is applied. @return DB_SUCCESS, or error code on failure */ -UNIV_INTERN dberr_t row_log_table_apply( -/*================*/ - que_thr_t* thr, /*!< in: query graph */ - dict_table_t* old_table, - /*!< in: old table */ - struct TABLE* table) /*!< in/out: MySQL table - (for reporting duplicates) */ + que_thr_t* thr, + dict_table_t* old_table, + struct TABLE* table, + ut_stage_alter_t* stage) { dberr_t error; dict_index_t* clust_index; thr_get_trx(thr)->error_key_num = 0; + DBUG_EXECUTE_IF("innodb_trx_duplicates", + thr_get_trx(thr)->duplicates = TRX_DUP_REPLACE;); -#ifdef UNIV_SYNC_DEBUG - ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED)); -#endif /* UNIV_SYNC_DEBUG */ + stage->begin_phase_log_table(); + + ut_ad(!rw_lock_own(dict_operation_lock, RW_LOCK_S)); clust_index = dict_table_get_first_index(old_table); rw_lock_x_lock(dict_index_get_lock(clust_index)); @@ -2867,7 +3135,7 @@ row_log_table_apply( clust_index->online_log->col_map, 0 }; - error = row_log_table_apply_ops(thr, &dup); + error = row_log_table_apply_ops(thr, &dup, stage); ut_ad(error != DB_SUCCESS || clust_index->online_log->head.total @@ -2875,6 +3143,9 @@ row_log_table_apply( } rw_lock_x_unlock(dict_index_get_lock(clust_index)); + DBUG_EXECUTE_IF("innodb_trx_duplicates", + thr_get_trx(thr)->duplicates = 0;); + return(error); } @@ -2882,7 +3153,6 @@ row_log_table_apply( Allocate the row log for an index and flag the index for online creation. @retval true if success, false if not */ -UNIV_INTERN bool row_log_allocate( /*=============*/ @@ -2894,9 +3164,8 @@ row_log_allocate( const dtuple_t* add_cols, /*!< in: default values of added columns, or NULL */ - const ulint* col_map,/*!< in: mapping of old column + const ulint* col_map)/*!< in: mapping of old column numbers to new ones, or NULL if !table */ - const char* path) /*!< in: where to create temporary file */ { row_log_t* log; DBUG_ENTER("row_log_allocate"); @@ -2907,17 +3176,17 @@ row_log_allocate( ut_ad(same_pk || table); ut_ad(!table || col_map); ut_ad(!add_cols || col_map); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ - log = (row_log_t*) ut_malloc(sizeof *log); - if (!log) { + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); + + log = static_cast<row_log_t*>(ut_malloc_nokey(sizeof *log)); + + if (log == NULL) { DBUG_RETURN(false); } log->fd = -1; - mutex_create(index_online_log_key, &log->mutex, - SYNC_INDEX_ONLINE_LOG); + mutex_create(LATCH_ID_INDEX_ONLINE_LOG, &log->mutex); + log->blobs = NULL; log->table = table; log->same_pk = same_pk; @@ -2930,7 +3199,9 @@ row_log_allocate( log->tail.block = log->head.block = NULL; log->head.blocks = log->head.bytes = 0; log->head.total = 0; - log->path = path; + log->n_old_col = index->table->n_cols; + log->n_old_vcol = index->table->n_v_cols; + dict_index_set_online_status(index, ONLINE_INDEX_CREATION); index->online_log = log; @@ -2944,7 +3215,6 @@ row_log_allocate( /******************************************************//** Free the row log for an index that was being created online. */ -UNIV_INTERN void row_log_free( /*=========*/ @@ -2952,31 +3222,30 @@ row_log_free( { MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX); - delete log->blobs; + UT_DELETE(log->blobs); row_log_block_free(log->tail); row_log_block_free(log->head); row_merge_file_destroy_low(log->fd); mutex_free(&log->mutex); ut_free(log); - log = 0; + log = NULL; } /******************************************************//** Get the latest transaction ID that has invoked row_log_online_op() during online creation. @return latest transaction ID, or 0 if nothing was logged */ -UNIV_INTERN trx_id_t row_log_get_max_trx( /*================*/ dict_index_t* index) /*!< in: index, must be locked */ { ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION); -#ifdef UNIV_SYNC_DEBUG - ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED) + + ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_S) && mutex_own(&index->online_log->mutex)) - || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); + return(index->online_log->max_trx); } @@ -3003,14 +3272,27 @@ row_log_apply_op_low( ulint* offsets = NULL; ut_ad(!dict_index_is_clust(index)); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX) + + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X) == has_index_lock); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(!dict_index_is_corrupted(index)); ut_ad(trx_id != 0 || op == ROW_OP_DELETE); +#ifdef UNIV_DEBUG + { + rec_printer p(entry); + DBUG_PRINT("ib_create_index", + ("%s %s index %llu,%lu: %s", + op == ROW_OP_INSERT ? "insert" : "delete", + has_index_lock ? "locked" : "unlocked", + index->id, trx_id, + p.str().c_str())); + } +#endif + mtr_start(&mtr); + mtr.set_named_space(index->space); /* We perform the pessimistic variant of the operations if we already hold index->lock exclusively. First, search the @@ -3067,6 +3349,7 @@ row_log_apply_op_low( Lock the index tree exclusively. */ mtr_commit(&mtr); mtr_start(&mtr); + mtr.set_named_space(index->space); btr_cur_search_to_nth_level( index, 0, entry, PAGE_CUR_LE, BTR_MODIFY_TREE, &cursor, 0, @@ -3083,11 +3366,11 @@ row_log_apply_op_low( /* As there are no externally stored fields in a secondary index record, the parameter - rb_ctx = RB_NONE will be ignored. */ + rollback=false will be ignored. */ btr_cur_pessimistic_delete( error, FALSE, &cursor, - BTR_CREATE_FLAG, RB_NONE, &mtr); + BTR_CREATE_FLAG, false, &mtr); break; case ROW_OP_INSERT: if (exists) { @@ -3169,6 +3452,7 @@ insert_the_rec: Lock the index tree exclusively. */ mtr_commit(&mtr); mtr_start(&mtr); + mtr.set_named_space(index->space); btr_cur_search_to_nth_level( index, 0, entry, PAGE_CUR_LE, BTR_MODIFY_TREE, &cursor, 0, @@ -3238,10 +3522,9 @@ row_log_apply_op( /* Online index creation is only used for secondary indexes. */ ut_ad(!dict_index_is_clust(index)); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX) + + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X) == has_index_lock); -#endif /* UNIV_SYNC_DEBUG */ if (dict_index_is_corrupted(index)) { *error = DB_INDEX_CORRUPT; @@ -3315,34 +3598,28 @@ corrupted: /* Online index creation is only implemented for secondary indexes, which never contain off-page columns. */ ut_ad(n_ext == 0); -#ifdef ROW_LOG_APPLY_PRINT - if (row_log_apply_print) { - fprintf(stderr, "apply " IB_ID_FMT " " TRX_ID_FMT " %u %u ", - index->id, trx_id, - unsigned (op), unsigned (has_index_lock)); - for (const byte* m = mrec - data_size; m < mrec; m++) { - fprintf(stderr, "%02x", *m); - } - putc('\n', stderr); - } -#endif /* ROW_LOG_APPLY_PRINT */ + row_log_apply_op_low(index, dup, error, offsets_heap, has_index_lock, op, trx_id, entry); return(mrec); } -/******************************************************//** -Applies operations to a secondary index that was being created. +/** Applies operations to a secondary index that was being created. +@param[in] trx transaction (for checking if the operation was +interrupted) +@param[in,out] index index +@param[in,out] dup for reporting duplicate key errors +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. If not NULL, then stage->inc() will be called for each block +of log that is applied. @return DB_SUCCESS, or error code on failure */ static MY_ATTRIBUTE((nonnull)) dberr_t row_log_apply_ops( -/*==============*/ - trx_t* trx, /*!< in: transaction (for checking if - the operation was interrupted) */ - dict_index_t* index, /*!< in/out: index */ - row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key - errors */ + const trx_t* trx, + dict_index_t* index, + row_merge_dup_t* dup, + ut_stage_alter_t* stage) { dberr_t error; const mrec_t* mrec = NULL; @@ -3357,14 +3634,12 @@ row_log_apply_ops( + dict_index_get_n_fields(index); ut_ad(dict_index_is_online_ddl(index)); - ut_ad(*index->name == TEMP_INDEX_PREFIX); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(!index->is_committed()); + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); ut_ad(index->online_log); UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end); - offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets)); + offsets = static_cast<ulint*>(ut_malloc_nokey(i * sizeof *offsets)); offsets[0] = i; offsets[1] = dict_index_get_n_fields(index); @@ -3374,11 +3649,11 @@ row_log_apply_ops( next_block: ut_ad(has_index_lock); -#ifdef UNIV_SYNC_DEBUG - ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); -#endif /* UNIV_SYNC_DEBUG */ + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)); ut_ad(index->online_log->head.bytes == 0); + stage->inc(row_log_progress_inc_per_block()); + if (trx_is_interrupted(trx)) { goto interrupted; } @@ -3396,8 +3671,8 @@ next_block: if (UNIV_UNLIKELY(index->online_log->head.blocks > index->online_log->tail.blocks)) { unexpected_eof: - fprintf(stderr, "InnoDB: unexpected end of temporary file" - " for index %s\n", index->name + 1); + ib::error() << "Unexpected end of temporary file for index " + << index->name; corruption: error = DB_CORRUPTION; goto func_exit; @@ -3408,11 +3683,13 @@ corruption: if (index->online_log->head.blocks) { #ifdef HAVE_FTRUNCATE /* Truncate the file in order to save space. */ - if (index->online_log->fd != -1 + if (index->online_log->fd > 0 && ftruncate(index->online_log->fd, 0) == -1) { - fprintf(stderr, "InnoDB: Error: Truncate of file " - "\'%s\' failed with error %d:%s\n", - index->name + 1, errno, strerror(errno)); + ib::error() + << "\'" << index->name + 1 + << "\' failed with error " + << errno << ":" << strerror(errno); + goto corruption; } #endif /* HAVE_FTRUNCATE */ @@ -3434,7 +3711,6 @@ all_done: } } else { os_offset_t ofs; - ibool success; ofs = (os_offset_t) index->online_log->head.blocks * srv_sort_buf_size; @@ -3450,14 +3726,19 @@ all_done: goto func_exit; } - success = os_file_read_no_error_handling( + IORequest request; + + dberr_t err = os_file_read_no_error_handling( + request, OS_FILE_FROM_FD(index->online_log->fd), index->online_log->head.block, ofs, - srv_sort_buf_size); + srv_sort_buf_size, + NULL); - if (!success) { - fprintf(stderr, "InnoDB: unable to read temporary file" - " for index %s\n", index->name + 1); + if (err != DB_SUCCESS) { + ib::error() + << "Unable to read temporary file" + " for index " << index->name; goto corruption; } @@ -3466,14 +3747,6 @@ all_done: posix_fadvise(index->online_log->fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); #endif /* POSIX_FADV_DONTNEED */ -#if 0 //def FALLOC_FL_PUNCH_HOLE - /* Try to deallocate the space for the file on disk. - This should work on ext4 on Linux 2.6.39 and later, - and be ignored when the operation is unsupported. */ - fallocate(index->online_log->fd, - FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, - ofs, srv_sort_buf_size); -#endif /* FALLOC_FL_PUNCH_HOLE */ next_mrec = index->online_log->head.block; next_mrec_end = next_mrec + srv_sort_buf_size; @@ -3655,18 +3928,21 @@ func_exit: return(error); } -/******************************************************//** -Apply the row log to the index upon completing index creation. +/** Apply the row log to the index upon completing index creation. +@param[in] trx transaction (for checking if the operation was +interrupted) +@param[in,out] index secondary index +@param[in,out] table MySQL table (for reporting duplicates) +@param[in,out] stage performance schema accounting object, used by +ALTER TABLE. stage->begin_phase_log_index() will be called initially and then +stage->inc() will be called for each block of log that is applied. @return DB_SUCCESS, or error code on failure */ -UNIV_INTERN dberr_t row_log_apply( -/*==========*/ - trx_t* trx, /*!< in: transaction (for checking if - the operation was interrupted) */ - dict_index_t* index, /*!< in/out: secondary index */ - struct TABLE* table) /*!< in/out: MySQL table - (for reporting duplicates) */ + const trx_t* trx, + dict_index_t* index, + struct TABLE* table, + ut_stage_alter_t* stage) { dberr_t error; row_log_t* log; @@ -3676,12 +3952,14 @@ row_log_apply( ut_ad(dict_index_is_online_ddl(index)); ut_ad(!dict_index_is_clust(index)); + stage->begin_phase_log_index(); + log_free_check(); rw_lock_x_lock(dict_index_get_lock(index)); if (!dict_table_is_corrupted(index->table)) { - error = row_log_apply_ops(trx, index, &dup); + error = row_log_apply_ops(trx, index, &dup, stage); } else { error = DB_SUCCESS; } @@ -3702,11 +3980,6 @@ row_log_apply( log = index->online_log; index->online_log = NULL; - /* We could remove the TEMP_INDEX_PREFIX and update the data - dictionary to say that this index is complete, if we had - access to the .frm file here. If the server crashes before - all requested indexes have been created, this completed index - will be dropped. */ rw_lock_x_unlock(dict_index_get_lock(index)); row_log_free(log); |