diff options
Diffstat (limited to 'storage/xtradb/row/row0log.cc')
-rw-r--r-- | storage/xtradb/row/row0log.cc | 3634 |
1 files changed, 3634 insertions, 0 deletions
diff --git a/storage/xtradb/row/row0log.cc b/storage/xtradb/row/row0log.cc new file mode 100644 index 00000000000..1240cf7fcc5 --- /dev/null +++ b/storage/xtradb/row/row0log.cc @@ -0,0 +1,3634 @@ +/***************************************************************************** + +Copyright (c) 2011, 2014, Oracle and/or its affiliates. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file row/row0log.cc +Modification log for online index creation and online table rebuild + +Created 2011-05-26 Marko Makela +*******************************************************/ + +#include "row0log.h" + +#ifdef UNIV_NONINL +#include "row0log.ic" +#endif + +#include "row0row.h" +#include "row0ins.h" +#include "row0upd.h" +#include "row0merge.h" +#include "row0ext.h" +#include "data0data.h" +#include "que0que.h" +#include "handler0alter.h" + +#include<map> + +/** Table row modification operations during online table rebuild. +Delete-marked records are not copied to the rebuilt table. */ +enum row_tab_op { + /** Insert a record */ + ROW_T_INSERT = 0x41, + /** Update a record in place */ + ROW_T_UPDATE, + /** Delete (purge) a record */ + ROW_T_DELETE +}; + +/** Index record modification operations during online index creation */ +enum row_op { + /** Insert a record */ + ROW_OP_INSERT = 0x61, + /** Delete a record */ + ROW_OP_DELETE +}; + +#ifdef UNIV_DEBUG +/** Write information about the applied record to the error log */ +# define ROW_LOG_APPLY_PRINT +#endif /* UNIV_DEBUG */ + +#ifdef ROW_LOG_APPLY_PRINT +/** When set, write information about the applied record to the error log */ +static bool row_log_apply_print; +#endif /* ROW_LOG_APPLY_PRINT */ + +/** Size of the modification log entry header, in bytes */ +#define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/ + +/** Log block for modifications during online ALTER TABLE */ +struct row_log_buf_t { + byte* block; /*!< file block buffer */ + mrec_buf_t buf; /*!< buffer for accessing a record + that spans two blocks */ + ulint blocks; /*!< current position in blocks */ + ulint bytes; /*!< current position within block */ + ulonglong total; /*!< logical position, in bytes from + the start of the row_log_table log; + 0 for row_log_online_op() and + row_log_apply(). */ + ulint size; /*!< allocated size of block */ +}; + +/** Tracks BLOB allocation during online ALTER TABLE */ +class row_log_table_blob_t { +public: + /** Constructor (declaring a BLOB freed) + @param offset_arg row_log_t::tail::total */ +#ifdef UNIV_DEBUG + row_log_table_blob_t(ulonglong offset_arg) : + old_offset (0), free_offset (offset_arg), + offset (BLOB_FREED) {} +#else /* UNIV_DEBUG */ + row_log_table_blob_t() : + offset (BLOB_FREED) {} +#endif /* UNIV_DEBUG */ + + /** Declare a BLOB freed again. + @param offset_arg row_log_t::tail::total */ +#ifdef UNIV_DEBUG + void blob_free(ulonglong offset_arg) +#else /* UNIV_DEBUG */ + void blob_free() +#endif /* UNIV_DEBUG */ + { + ut_ad(offset < offset_arg); + ut_ad(offset != BLOB_FREED); + ut_d(old_offset = offset); + ut_d(free_offset = offset_arg); + offset = BLOB_FREED; + } + /** Declare a freed BLOB reused. + @param offset_arg row_log_t::tail::total */ + void blob_alloc(ulonglong offset_arg) { + ut_ad(free_offset <= offset_arg); + ut_d(old_offset = offset); + offset = offset_arg; + } + /** Determine if a BLOB was freed at a given log position + @param offset_arg row_log_t::head::total after the log record + @return true if freed */ + bool is_freed(ulonglong offset_arg) const { + /* This is supposed to be the offset at the end of the + current log record. */ + ut_ad(offset_arg > 0); + /* We should never get anywhere close the magic value. */ + ut_ad(offset_arg < BLOB_FREED); + return(offset_arg < offset); + } +private: + /** Magic value for a freed BLOB */ + static const ulonglong BLOB_FREED = ~0ULL; +#ifdef UNIV_DEBUG + /** Old offset, in case a page was freed, reused, freed, ... */ + ulonglong old_offset; + /** Offset of last blob_free() */ + ulonglong free_offset; +#endif /* UNIV_DEBUG */ + /** Byte offset to the log file */ + ulonglong offset; +}; + +/** @brief Map of off-page column page numbers to 0 or log byte offsets. + +If there is no mapping for a page number, it is safe to access. +If a page number maps to 0, it is an off-page column that has been freed. +If a page number maps to a nonzero number, the number is a byte offset +into the index->online_log, indicating that the page is safe to access +when applying log records starting from that offset. */ +typedef std::map<ulint, row_log_table_blob_t> page_no_map; + +/** @brief Buffer for logging modifications during online index creation + +All modifications to an index that is being created will be logged by +row_log_online_op() to this buffer. + +All modifications to a table that is being rebuilt will be logged by +row_log_table_delete(), row_log_table_update(), row_log_table_insert() +to this buffer. + +When head.blocks == tail.blocks, the reader will access tail.block +directly. When also head.bytes == tail.bytes, both counts will be +reset to 0 and the file will be truncated. */ +struct row_log_t { + int fd; /*!< file descriptor */ + ib_mutex_t mutex; /*!< mutex protecting error, + max_trx and tail */ + page_no_map* blobs; /*!< map of page numbers of off-page columns + that have been freed during table-rebuilding + ALTER TABLE (row_log_table_*); protected by + index->lock X-latch only */ + dict_table_t* table; /*!< table that is being rebuilt, + or NULL when this is a secondary + index that is being created online */ + bool same_pk;/*!< whether the definition of the PRIMARY KEY + has remained the same */ + const dtuple_t* add_cols; + /*!< default values of added columns, or NULL */ + const ulint* col_map;/*!< mapping of old column numbers to + new ones, or NULL if !table */ + dberr_t error; /*!< error that occurred during online + table rebuild */ + trx_id_t max_trx;/*!< biggest observed trx_id in + row_log_online_op(); + protected by mutex and index->lock S-latch, + or by index->lock X-latch only */ + row_log_buf_t tail; /*!< writer context; + protected by mutex and index->lock S-latch, + or by index->lock X-latch only */ + row_log_buf_t head; /*!< reader context; protected by MDL only; + modifiable by row_log_apply_ops() */ +}; + + +/** Allocate the memory for the log buffer. +@param[in,out] log_buf Buffer used for log operation +@return TRUE if success, false if not */ +static __attribute__((warn_unused_result)) +bool +row_log_block_allocate( + row_log_buf_t& log_buf) +{ + DBUG_ENTER("row_log_block_allocate"); + if (log_buf.block == NULL) { + log_buf.size = srv_sort_buf_size; + log_buf.block = (byte*) os_mem_alloc_large(&log_buf.size, + FALSE); + DBUG_EXECUTE_IF("simulate_row_log_allocation_failure", + if (log_buf.block) + os_mem_free_large(log_buf.block, log_buf.size); + log_buf.block = NULL;); + if (!log_buf.block) { + DBUG_RETURN(false); + } + } + DBUG_RETURN(true); +} + +/** Free the log buffer. +@param[in,out] log_buf Buffer used for log operation */ +static +void +row_log_block_free( + row_log_buf_t& log_buf) +{ + DBUG_ENTER("row_log_block_free"); + if (log_buf.block != NULL) { + os_mem_free_large(log_buf.block, log_buf.size); + log_buf.block = NULL; + } + DBUG_VOID_RETURN; +} + +/******************************************************//** +Logs an operation to a secondary index that is (or was) being created. */ +UNIV_INTERN +void +row_log_online_op( +/*==============*/ + dict_index_t* index, /*!< in/out: index, S or X latched */ + const dtuple_t* tuple, /*!< in: index tuple */ + trx_id_t trx_id) /*!< in: transaction ID for insert, + or 0 for delete */ +{ + byte* b; + ulint extra_size; + ulint size; + ulint mrec_size; + ulint avail_size; + row_log_t* log; + + ut_ad(dtuple_validate(tuple)); + ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index)); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED) + || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + + if (dict_index_is_corrupted(index)) { + return; + } + + ut_ad(dict_index_is_online_ddl(index)); + + /* Compute the size of the record. This differs from + row_merge_buf_encode(), because here we do not encode + extra_size+1 (and reserve 0 as the end-of-chunk marker). */ + + size = rec_get_converted_size_temp( + index, tuple->fields, tuple->n_fields, &extra_size); + ut_ad(size >= extra_size); + ut_ad(size <= sizeof log->tail.buf); + + mrec_size = ROW_LOG_HEADER_SIZE + + (extra_size >= 0x80) + size + + (trx_id ? DATA_TRX_ID_LEN : 0); + + log = index->online_log; + mutex_enter(&log->mutex); + + if (trx_id > log->max_trx) { + log->max_trx = trx_id; + } + + if (!row_log_block_allocate(log->tail)) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf); + + ut_ad(log->tail.bytes < srv_sort_buf_size); + avail_size = srv_sort_buf_size - log->tail.bytes; + + if (mrec_size > avail_size) { + b = log->tail.buf; + } else { + b = log->tail.block + log->tail.bytes; + } + + if (trx_id != 0) { + *b++ = ROW_OP_INSERT; + trx_write_trx_id(b, trx_id); + b += DATA_TRX_ID_LEN; + } else { + *b++ = ROW_OP_DELETE; + } + + if (extra_size < 0x80) { + *b++ = (byte) extra_size; + } else { + ut_ad(extra_size < 0x8000); + *b++ = (byte) (0x80 | (extra_size >> 8)); + *b++ = (byte) extra_size; + } + + rec_convert_dtuple_to_temp( + b + extra_size, index, tuple->fields, tuple->n_fields); + b += size; + + if (mrec_size >= avail_size) { + const os_offset_t byte_offset + = (os_offset_t) log->tail.blocks + * srv_sort_buf_size; + ibool ret; + + if (byte_offset + srv_sort_buf_size >= srv_online_max_size) { + goto write_failed; + } + + if (mrec_size == avail_size) { + ut_ad(b == &log->tail.block[srv_sort_buf_size]); + } else { + ut_ad(b == log->tail.buf + mrec_size); + memcpy(log->tail.block + log->tail.bytes, + log->tail.buf, avail_size); + } + UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size); + ret = os_file_write( + "(modification log)", + OS_FILE_FROM_FD(log->fd), + log->tail.block, byte_offset, srv_sort_buf_size); + log->tail.blocks++; + if (!ret) { +write_failed: + /* We set the flag directly instead of invoking + dict_set_corrupted_index_cache_only(index) here, + because the index is not "public" yet. */ + index->type |= DICT_CORRUPT; + } + UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size); + memcpy(log->tail.block, log->tail.buf + avail_size, + mrec_size - avail_size); + log->tail.bytes = mrec_size - avail_size; + } else { + log->tail.bytes += mrec_size; + ut_ad(b == log->tail.block + log->tail.bytes); + } + + UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf); +err_exit: + mutex_exit(&log->mutex); +} + +/******************************************************//** +Gets the error status of the online index rebuild log. +@return DB_SUCCESS or error code */ +UNIV_INTERN +dberr_t +row_log_table_get_error( +/*====================*/ + const dict_index_t* index) /*!< in: clustered index of a table + that is being rebuilt online */ +{ + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + return(index->online_log->error); +} + +/******************************************************//** +Starts logging an operation to a table that is being rebuilt. +@return pointer to log, or NULL if no logging is necessary */ +static __attribute__((nonnull, warn_unused_result)) +byte* +row_log_table_open( +/*===============*/ + row_log_t* log, /*!< in/out: online rebuild log */ + ulint size, /*!< in: size of log record */ + ulint* avail) /*!< out: available size for log record */ +{ + mutex_enter(&log->mutex); + + UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf); + + if (log->error != DB_SUCCESS) { +err_exit: + mutex_exit(&log->mutex); + return(NULL); + } + + if (!row_log_block_allocate(log->tail)) { + log->error = DB_OUT_OF_MEMORY; + goto err_exit; + } + + ut_ad(log->tail.bytes < srv_sort_buf_size); + *avail = srv_sort_buf_size - log->tail.bytes; + + if (size > *avail) { + return(log->tail.buf); + } else { + return(log->tail.block + log->tail.bytes); + } +} + +/******************************************************//** +Stops logging an operation to a table that is being rebuilt. */ +static __attribute__((nonnull)) +void +row_log_table_close_func( +/*=====================*/ + row_log_t* log, /*!< in/out: online rebuild log */ +#ifdef UNIV_DEBUG + const byte* b, /*!< in: end of log record */ +#endif /* UNIV_DEBUG */ + ulint size, /*!< in: size of log record */ + ulint avail) /*!< in: available size for log record */ +{ + ut_ad(mutex_own(&log->mutex)); + + if (size >= avail) { + const os_offset_t byte_offset + = (os_offset_t) log->tail.blocks + * srv_sort_buf_size; + ibool ret; + + if (byte_offset + srv_sort_buf_size >= srv_online_max_size) { + goto write_failed; + } + + if (size == avail) { + ut_ad(b == &log->tail.block[srv_sort_buf_size]); + } else { + ut_ad(b == log->tail.buf + size); + memcpy(log->tail.block + log->tail.bytes, + log->tail.buf, avail); + } + UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size); + ret = os_file_write( + "(modification log)", + OS_FILE_FROM_FD(log->fd), + log->tail.block, byte_offset, srv_sort_buf_size); + log->tail.blocks++; + if (!ret) { +write_failed: + log->error = DB_ONLINE_LOG_TOO_BIG; + } + UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size); + memcpy(log->tail.block, log->tail.buf + avail, size - avail); + log->tail.bytes = size - avail; + } else { + log->tail.bytes += size; + ut_ad(b == log->tail.block + log->tail.bytes); + } + + log->tail.total += size; + UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf); + mutex_exit(&log->mutex); +} + +#ifdef UNIV_DEBUG +# define row_log_table_close(log, b, size, avail) \ + row_log_table_close_func(log, b, size, avail) +#else /* UNIV_DEBUG */ +# define row_log_table_close(log, b, size, avail) \ + row_log_table_close_func(log, size, avail) +#endif /* UNIV_DEBUG */ + +/******************************************************//** +Logs a delete operation to a table that is being rebuilt. +This will be merged in row_log_table_apply_delete(). */ +UNIV_INTERN +void +row_log_table_delete( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */ + const byte* sys) /*!< in: DB_TRX_ID,DB_ROLL_PTR that should + be logged, or NULL to use those in rec */ +{ + ulint old_pk_extra_size; + ulint old_pk_size; + ulint ext_size = 0; + ulint mrec_size; + ulint avail_size; + mem_heap_t* heap = NULL; + const dtuple_t* old_pk; + row_ext_t* ext; + + ut_ad(dict_index_is_clust(index)); + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index)); + ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED) + || rw_lock_own(&index->lock, RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + + if (dict_index_is_corrupted(index) + || !dict_index_is_online_ddl(index) + || index->online_log->error != DB_SUCCESS) { + return; + } + + dict_table_t* new_table = index->online_log->table; + dict_index_t* new_index = dict_table_get_first_index(new_table); + + ut_ad(dict_index_is_clust(new_index)); + ut_ad(!dict_index_is_online_ddl(new_index)); + + /* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */ + if (index->online_log->same_pk) { + dtuple_t* tuple; + ut_ad(new_index->n_uniq == index->n_uniq); + + /* The PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR are in the first + fields of the record. */ + heap = mem_heap_create( + DATA_TRX_ID_LEN + + DTUPLE_EST_ALLOC(new_index->n_uniq + 2)); + old_pk = tuple = dtuple_create(heap, new_index->n_uniq + 2); + dict_index_copy_types(tuple, new_index, tuple->n_fields); + dtuple_set_n_fields_cmp(tuple, new_index->n_uniq); + + for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) { + ulint len; + const void* field = rec_get_nth_field( + rec, offsets, i, &len); + dfield_t* dfield = dtuple_get_nth_field( + tuple, i); + ut_ad(len != UNIV_SQL_NULL); + ut_ad(!rec_offs_nth_extern(offsets, i)); + dfield_set_data(dfield, field, len); + } + + if (sys) { + dfield_set_data( + dtuple_get_nth_field(tuple, + new_index->n_uniq), + sys, DATA_TRX_ID_LEN); + dfield_set_data( + dtuple_get_nth_field(tuple, + new_index->n_uniq + 1), + sys + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN); + } + } else { + /* The PRIMARY KEY has changed. Translate the tuple. */ + old_pk = row_log_table_get_pk( + rec, index, offsets, NULL, &heap); + + if (!old_pk) { + ut_ad(index->online_log->error != DB_SUCCESS); + if (heap) { + goto func_exit; + } + return; + } + } + + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + old_pk_size = rec_get_converted_size_temp( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + + mrec_size = 4 + old_pk_size; + + /* Log enough prefix of the BLOB unless both the + old and new table are in COMPACT or REDUNDANT format, + which store the prefix in the clustered index record. */ + if (rec_offs_any_extern(offsets) + && (dict_table_get_format(index->table) >= UNIV_FORMAT_B + || dict_table_get_format(new_table) >= UNIV_FORMAT_B)) { + + /* Build a cache of those off-page column prefixes + that are referenced by secondary indexes. It can be + that none of the off-page columns are needed. */ + row_build(ROW_COPY_DATA, index, rec, + offsets, NULL, NULL, NULL, &ext, heap); + if (ext) { + /* Log the row_ext_t, ext->ext and ext->buf */ + ext_size = ext->n_ext * ext->max_len + + sizeof(*ext) + + ext->n_ext * sizeof(ulint) + + (ext->n_ext - 1) * sizeof ext->len; + mrec_size += ext_size; + } + } + + if (byte* b = row_log_table_open(index->online_log, + mrec_size, &avail_size)) { + *b++ = ROW_T_DELETE; + *b++ = static_cast<byte>(old_pk_extra_size); + + /* Log the size of external prefix we saved */ + mach_write_to_2(b, ext_size); + b += 2; + + rec_convert_dtuple_to_temp( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + + b += old_pk_size; + + if (ext_size) { + ulint cur_ext_size = sizeof(*ext) + + (ext->n_ext - 1) * sizeof ext->len; + + memcpy(b, ext, cur_ext_size); + b += cur_ext_size; + + /* Check if we need to col_map to adjust the column + number. If columns were added/removed/reordered, + adjust the column number. */ + if (const ulint* col_map = + index->online_log->col_map) { + for (ulint i = 0; i < ext->n_ext; i++) { + const_cast<ulint&>(ext->ext[i]) = + col_map[ext->ext[i]]; + } + } + + memcpy(b, ext->ext, ext->n_ext * sizeof(*ext->ext)); + b += ext->n_ext * sizeof(*ext->ext); + + ext_size -= cur_ext_size + + ext->n_ext * sizeof(*ext->ext); + memcpy(b, ext->buf, ext_size); + b += ext_size; + } + + row_log_table_close( + index->online_log, b, mrec_size, avail_size); + } + +func_exit: + mem_heap_free(heap); +} + +/******************************************************//** +Logs an insert or update to a table that is being rebuilt. */ +static +void +row_log_table_low_redundant( +/*========================*/ + const rec_t* rec, /*!< in: clustered index leaf + page record in ROW_FORMAT=REDUNDANT, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + bool insert, /*!< in: true if insert, + false if update */ + const dtuple_t* old_pk, /*!< in: old PRIMARY KEY value + (if !insert and a PRIMARY KEY + is being created) */ + const dict_index_t* new_index) + /*!< in: clustered index of the + new table, not latched */ +{ + ulint old_pk_size; + ulint old_pk_extra_size; + ulint size; + ulint extra_size; + ulint mrec_size; + ulint avail_size; + mem_heap_t* heap = NULL; + dtuple_t* tuple; + + ut_ad(!page_is_comp(page_align(rec))); + ut_ad(dict_index_get_n_fields(index) == rec_get_n_fields_old(rec)); + ut_ad(dict_tf_is_valid(index->table->flags)); + ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */ + ut_ad(dict_index_is_clust(new_index)); + + heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields)); + tuple = dtuple_create(heap, index->n_fields); + dict_index_copy_types(tuple, index, index->n_fields); + dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index)); + + if (rec_get_1byte_offs_flag(rec)) { + for (ulint i = 0; i < index->n_fields; i++) { + dfield_t* dfield; + ulint len; + const void* field; + + dfield = dtuple_get_nth_field(tuple, i); + field = rec_get_nth_field_old(rec, i, &len); + + dfield_set_data(dfield, field, len); + } + } else { + for (ulint i = 0; i < index->n_fields; i++) { + dfield_t* dfield; + ulint len; + const void* field; + + dfield = dtuple_get_nth_field(tuple, i); + field = rec_get_nth_field_old(rec, i, &len); + + dfield_set_data(dfield, field, len); + + if (rec_2_is_field_extern(rec, i)) { + dfield_set_ext(dfield); + } + } + } + + size = rec_get_converted_size_temp( + index, tuple->fields, tuple->n_fields, &extra_size); + + mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80); + + if (insert || index->online_log->same_pk) { + ut_ad(!old_pk); + old_pk_extra_size = old_pk_size = 0; + } else { + ut_ad(old_pk); + ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp); + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + + old_pk_size = rec_get_converted_size_temp( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + mrec_size += 1/*old_pk_extra_size*/ + old_pk_size; + } + + if (byte* b = row_log_table_open(index->online_log, + mrec_size, &avail_size)) { + *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE; + + if (old_pk_size) { + *b++ = static_cast<byte>(old_pk_extra_size); + + rec_convert_dtuple_to_temp( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + b += old_pk_size; + } + + if (extra_size < 0x80) { + *b++ = static_cast<byte>(extra_size); + } else { + ut_ad(extra_size < 0x8000); + *b++ = static_cast<byte>(0x80 | (extra_size >> 8)); + *b++ = static_cast<byte>(extra_size); + } + + rec_convert_dtuple_to_temp( + b + extra_size, index, tuple->fields, tuple->n_fields); + b += size; + + row_log_table_close( + index->online_log, b, mrec_size, avail_size); + } + + mem_heap_free(heap); +} + +/******************************************************//** +Logs an insert or update to a table that is being rebuilt. */ +static __attribute__((nonnull(1,2,3))) +void +row_log_table_low( +/*==============*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */ + bool insert, /*!< in: true if insert, false if update */ + const dtuple_t* old_pk) /*!< in: old PRIMARY KEY value (if !insert + and a PRIMARY KEY is being created) */ +{ + ulint omit_size; + ulint old_pk_size; + ulint old_pk_extra_size; + ulint extra_size; + ulint mrec_size; + ulint avail_size; + const dict_index_t* new_index = dict_table_get_first_index( + index->online_log->table); + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_clust(new_index)); + ut_ad(!dict_index_is_online_ddl(new_index)); + ut_ad(rec_offs_validate(rec, index, offsets)); + ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index)); + ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED) + || rw_lock_own(&index->lock, RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX); + ut_ad(page_is_leaf(page_align(rec))); + ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets)); + + if (dict_index_is_corrupted(index) + || !dict_index_is_online_ddl(index) + || index->online_log->error != DB_SUCCESS) { + return; + } + + if (!rec_offs_comp(offsets)) { + row_log_table_low_redundant( + rec, index, insert, old_pk, new_index); + return; + } + + ut_ad(page_is_comp(page_align(rec))); + ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY); + + omit_size = REC_N_NEW_EXTRA_BYTES; + + extra_size = rec_offs_extra_size(offsets) - omit_size; + + mrec_size = ROW_LOG_HEADER_SIZE + + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size; + + if (insert || index->online_log->same_pk) { + ut_ad(!old_pk); + old_pk_extra_size = old_pk_size = 0; + } else { + ut_ad(old_pk); + ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp); + ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 2)->len); + ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field( + old_pk, old_pk->n_fields - 1)->len); + + old_pk_size = rec_get_converted_size_temp( + new_index, old_pk->fields, old_pk->n_fields, + &old_pk_extra_size); + ut_ad(old_pk_extra_size < 0x100); + mrec_size += 1/*old_pk_extra_size*/ + old_pk_size; + } + + if (byte* b = row_log_table_open(index->online_log, + mrec_size, &avail_size)) { + *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE; + + if (old_pk_size) { + *b++ = static_cast<byte>(old_pk_extra_size); + + rec_convert_dtuple_to_temp( + b + old_pk_extra_size, new_index, + old_pk->fields, old_pk->n_fields); + b += old_pk_size; + } + + if (extra_size < 0x80) { + *b++ = static_cast<byte>(extra_size); + } else { + ut_ad(extra_size < 0x8000); + *b++ = static_cast<byte>(0x80 | (extra_size >> 8)); + *b++ = static_cast<byte>(extra_size); + } + + memcpy(b, rec - rec_offs_extra_size(offsets), extra_size); + b += extra_size; + memcpy(b, rec, rec_offs_data_size(offsets)); + b += rec_offs_data_size(offsets); + + row_log_table_close( + index->online_log, b, mrec_size, avail_size); + } +} + +/******************************************************//** +Logs an update to a table that is being rebuilt. +This will be merged in row_log_table_apply_update(). */ +UNIV_INTERN +void +row_log_table_update( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */ + const dtuple_t* old_pk) /*!< in: row_log_table_get_pk() + before the update */ +{ + row_log_table_low(rec, index, offsets, false, old_pk); +} + +/** Gets the old table column of a PRIMARY KEY column. +@param table old table (before ALTER TABLE) +@param col_map mapping of old column numbers to new ones +@param col_no column position in the new table +@return old table column, or NULL if this is an added column */ +static +const dict_col_t* +row_log_table_get_pk_old_col( +/*=========================*/ + const dict_table_t* table, + const ulint* col_map, + ulint col_no) +{ + for (ulint i = 0; i < table->n_cols; i++) { + if (col_no == col_map[i]) { + return(dict_table_get_nth_col(table, i)); + } + } + + return(NULL); +} + +/** Maps an old table column of a PRIMARY KEY column. +@param col old table column (before ALTER TABLE) +@param ifield clustered index field in the new table (after ALTER TABLE) +@param dfield clustered index tuple field in the new table +@param heap memory heap for allocating dfield contents +@param rec clustered index leaf page record in the old table +@param offsets rec_get_offsets(rec) +@param i rec field corresponding to col +@param zip_size compressed page size of the old table, or 0 for uncompressed +@param max_len maximum length of dfield +@retval DB_INVALID_NULL if a NULL value is encountered +@retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */ +static +dberr_t +row_log_table_get_pk_col( +/*=====================*/ + const dict_col_t* col, + const dict_field_t* ifield, + dfield_t* dfield, + mem_heap_t* heap, + const rec_t* rec, + const ulint* offsets, + ulint i, + ulint zip_size, + ulint max_len) +{ + const byte* field; + ulint len; + + ut_ad(ut_is_2pow(zip_size)); + + field = rec_get_nth_field(rec, offsets, i, &len); + + if (len == UNIV_SQL_NULL) { + return(DB_INVALID_NULL); + } + + if (rec_offs_nth_extern(offsets, i)) { + ulint field_len = ifield->prefix_len; + byte* blob_field; + + if (!field_len) { + field_len = ifield->fixed_len; + if (!field_len) { + field_len = max_len + 1; + } + } + + blob_field = static_cast<byte*>( + mem_heap_alloc(heap, field_len)); + + len = btr_copy_externally_stored_field_prefix( + blob_field, field_len, zip_size, field, len); + if (len >= max_len + 1) { + return(DB_TOO_BIG_INDEX_COL); + } + + dfield_set_data(dfield, blob_field, len); + } else { + dfield_set_data(dfield, mem_heap_dup(heap, field, len), len); + } + + return(DB_SUCCESS); +} + +/******************************************************//** +Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR +of a table that is being rebuilt. +@return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table, +or NULL if the PRIMARY KEY definition does not change */ +UNIV_INTERN +const dtuple_t* +row_log_table_get_pk( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */ + byte* sys, /*!< out: DB_TRX_ID,DB_ROLL_PTR for + row_log_table_delete(), or NULL */ + mem_heap_t** heap) /*!< in/out: memory heap where allocated */ +{ + dtuple_t* tuple = NULL; + row_log_t* log = index->online_log; + + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(!offsets || rec_offs_validate(rec, index, offsets)); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED) + || rw_lock_own(&index->lock, RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + + ut_ad(log); + ut_ad(log->table); + + if (log->same_pk) { + /* The PRIMARY KEY columns are unchanged. */ + if (sys) { + /* Store the DB_TRX_ID,DB_ROLL_PTR. */ + ulint trx_id_offs = index->trx_id_offset; + + if (!trx_id_offs) { + ulint pos = dict_index_get_sys_col_pos( + index, DATA_TRX_ID); + ulint len; + ut_ad(pos > 0); + + if (!offsets) { + offsets = rec_get_offsets( + rec, index, NULL, pos + 1, + heap); + } + + trx_id_offs = rec_get_nth_field_offs( + offsets, pos, &len); + ut_ad(len == DATA_TRX_ID_LEN); + } + + memcpy(sys, rec + trx_id_offs, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + } + + return(NULL); + } + + mutex_enter(&log->mutex); + + /* log->error is protected by log->mutex. */ + if (log->error == DB_SUCCESS) { + dict_table_t* new_table = log->table; + dict_index_t* new_index + = dict_table_get_first_index(new_table); + const ulint new_n_uniq + = dict_index_get_n_unique(new_index); + + if (!*heap) { + ulint size = 0; + + if (!offsets) { + size += (1 + REC_OFFS_HEADER_SIZE + + index->n_fields) + * sizeof *offsets; + } + + for (ulint i = 0; i < new_n_uniq; i++) { + size += dict_col_get_min_size( + dict_index_get_nth_col(new_index, i)); + } + + *heap = mem_heap_create( + DTUPLE_EST_ALLOC(new_n_uniq + 2) + size); + } + + if (!offsets) { + offsets = rec_get_offsets(rec, index, NULL, + ULINT_UNDEFINED, heap); + } + + tuple = dtuple_create(*heap, new_n_uniq + 2); + dict_index_copy_types(tuple, new_index, tuple->n_fields); + dtuple_set_n_fields_cmp(tuple, new_n_uniq); + + const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table); + const ulint zip_size = dict_table_zip_size(index->table); + + for (ulint new_i = 0; new_i < new_n_uniq; new_i++) { + dict_field_t* ifield; + dfield_t* dfield; + ulint prtype; + ulint mbminmaxlen; + + ifield = dict_index_get_nth_field(new_index, new_i); + dfield = dtuple_get_nth_field(tuple, new_i); + + const ulint col_no + = dict_field_get_col(ifield)->ind; + + if (const dict_col_t* col + = row_log_table_get_pk_old_col( + index->table, log->col_map, col_no)) { + ulint i = dict_col_get_clust_pos(col, index); + + if (i == ULINT_UNDEFINED) { + ut_ad(0); + log->error = DB_CORRUPTION; + goto err_exit; + } + + log->error = row_log_table_get_pk_col( + col, ifield, dfield, *heap, + rec, offsets, i, zip_size, max_len); + + if (log->error != DB_SUCCESS) { +err_exit: + tuple = NULL; + goto func_exit; + } + + mbminmaxlen = col->mbminmaxlen; + prtype = col->prtype; + } else { + /* No matching column was found in the old + table, so this must be an added column. + Copy the default value. */ + ut_ad(log->add_cols); + + dfield_copy(dfield, dtuple_get_nth_field( + log->add_cols, col_no)); + mbminmaxlen = dfield->type.mbminmaxlen; + prtype = dfield->type.prtype; + } + + ut_ad(!dfield_is_ext(dfield)); + ut_ad(!dfield_is_null(dfield)); + + if (ifield->prefix_len) { + ulint len = dtype_get_at_most_n_mbchars( + prtype, mbminmaxlen, + ifield->prefix_len, + dfield_get_len(dfield), + static_cast<const char*>( + dfield_get_data(dfield))); + + ut_ad(len <= dfield_get_len(dfield)); + dfield_set_len(dfield, len); + } + } + + const byte* trx_roll = rec + + row_get_trx_id_offset(index, offsets); + + /* Copy the fields, because the fields will be updated + or the record may be moved somewhere else in the B-tree + as part of the upcoming operation. */ + if (sys) { + memcpy(sys, trx_roll, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + trx_roll = sys; + } else { + trx_roll = static_cast<const byte*>( + mem_heap_dup( + *heap, trx_roll, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)); + } + + dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq), + trx_roll, DATA_TRX_ID_LEN); + dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1), + trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN); + } + +func_exit: + mutex_exit(&log->mutex); + return(tuple); +} + +/******************************************************//** +Logs an insert to a table that is being rebuilt. +This will be merged in row_log_table_apply_insert(). */ +UNIV_INTERN +void +row_log_table_insert( +/*=================*/ + const rec_t* rec, /*!< in: clustered index leaf page record, + page X-latched */ + dict_index_t* index, /*!< in/out: clustered index, S-latched + or X-latched */ + const ulint* offsets)/*!< in: rec_get_offsets(rec,index) */ +{ + row_log_table_low(rec, index, offsets, true, NULL); +} + +/******************************************************//** +Notes that a BLOB is being freed during online ALTER TABLE. */ +UNIV_INTERN +void +row_log_table_blob_free( +/*====================*/ + dict_index_t* index, /*!< in/out: clustered index, X-latched */ + ulint page_no)/*!< in: starting page number of the BLOB */ +{ + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(page_no != FIL_NULL); + + if (index->online_log->error != DB_SUCCESS) { + return; + } + + page_no_map* blobs = index->online_log->blobs; + + if (!blobs) { + index->online_log->blobs = blobs = new page_no_map(); + } + +#ifdef UNIV_DEBUG + const ulonglong log_pos = index->online_log->tail.total; +#else +# define log_pos /* empty */ +#endif /* UNIV_DEBUG */ + + const page_no_map::value_type v(page_no, + row_log_table_blob_t(log_pos)); + + std::pair<page_no_map::iterator,bool> p = blobs->insert(v); + + if (!p.second) { + /* Update the existing mapping. */ + ut_ad(p.first->first == page_no); + p.first->second.blob_free(log_pos); + } +#undef log_pos +} + +/******************************************************//** +Notes that a BLOB is being allocated during online ALTER TABLE. */ +UNIV_INTERN +void +row_log_table_blob_alloc( +/*=====================*/ + dict_index_t* index, /*!< in/out: clustered index, X-latched */ + ulint page_no)/*!< in: starting page number of the BLOB */ +{ + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(page_no != FIL_NULL); + + if (index->online_log->error != DB_SUCCESS) { + return; + } + + /* Only track allocations if the same page has been freed + earlier. Double allocation without a free is not allowed. */ + if (page_no_map* blobs = index->online_log->blobs) { + page_no_map::iterator p = blobs->find(page_no); + + if (p != blobs->end()) { + ut_ad(p->first == page_no); + p->second.blob_alloc(index->online_log->tail.total); + } + } +} + +/******************************************************//** +Converts a log record to a table row. +@return converted row, or NULL if the conversion fails */ +static __attribute__((nonnull, warn_unused_result)) +const dtuple_t* +row_log_table_apply_convert_mrec( +/*=============================*/ + const mrec_t* mrec, /*!< in: merge record */ + dict_index_t* index, /*!< in: index of mrec */ + const ulint* offsets, /*!< in: offsets of mrec */ + const row_log_t* log, /*!< in: rebuild context */ + mem_heap_t* heap, /*!< in/out: memory heap */ + trx_id_t trx_id, /*!< in: DB_TRX_ID of mrec */ + dberr_t* error) /*!< out: DB_SUCCESS or + DB_MISSING_HISTORY or + reason of failure */ +{ + dtuple_t* row; + + *error = DB_SUCCESS; + + /* This is based on row_build(). */ + if (log->add_cols) { + row = dtuple_copy(log->add_cols, heap); + /* dict_table_copy_types() would set the fields to NULL */ + for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) { + dict_col_copy_type( + dict_table_get_nth_col(log->table, i), + dfield_get_type(dtuple_get_nth_field(row, i))); + } + } else { + row = dtuple_create(heap, dict_table_get_n_cols(log->table)); + dict_table_copy_types(row, log->table); + } + + for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) { + const dict_field_t* ind_field + = dict_index_get_nth_field(index, i); + + if (ind_field->prefix_len) { + /* Column prefixes can only occur in key + fields, which cannot be stored externally. For + a column prefix, there should also be the full + field in the clustered index tuple. The row + tuple comprises full fields, not prefixes. */ + ut_ad(!rec_offs_nth_extern(offsets, i)); + continue; + } + + const dict_col_t* col + = dict_field_get_col(ind_field); + ulint col_no + = log->col_map[dict_col_get_no(col)]; + + if (col_no == ULINT_UNDEFINED) { + /* dropped column */ + continue; + } + + dfield_t* dfield + = dtuple_get_nth_field(row, col_no); + ulint len; + const byte* data; + + if (rec_offs_nth_extern(offsets, i)) { + ut_ad(rec_offs_any_extern(offsets)); + rw_lock_x_lock(dict_index_get_lock(index)); + + if (const page_no_map* blobs = log->blobs) { + data = rec_get_nth_field( + mrec, offsets, i, &len); + ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE); + + ulint page_no = mach_read_from_4( + data + len - (BTR_EXTERN_FIELD_REF_SIZE + - BTR_EXTERN_PAGE_NO)); + page_no_map::const_iterator p = blobs->find( + page_no); + if (p != blobs->end() + && p->second.is_freed(log->head.total)) { + /* This BLOB has been freed. + We must not access the row. */ + *error = DB_MISSING_HISTORY; + dfield_set_data(dfield, data, len); + dfield_set_ext(dfield); + goto blob_done; + } + } + + data = btr_rec_copy_externally_stored_field( + mrec, offsets, + dict_table_zip_size(index->table), + i, &len, heap); + ut_a(data); + dfield_set_data(dfield, data, len); +blob_done: + rw_lock_x_unlock(dict_index_get_lock(index)); + } else { + data = rec_get_nth_field(mrec, offsets, i, &len); + dfield_set_data(dfield, data, len); + } + + /* See if any columns were changed to NULL or NOT NULL. */ + const dict_col_t* new_col + = dict_table_get_nth_col(log->table, col_no); + ut_ad(new_col->mtype == col->mtype); + + /* Assert that prtype matches except for nullability. */ + ut_ad(!((new_col->prtype ^ col->prtype) & ~DATA_NOT_NULL)); + ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype) + & ~DATA_NOT_NULL)); + + if (new_col->prtype == col->prtype) { + continue; + } + + if ((new_col->prtype & DATA_NOT_NULL) + && dfield_is_null(dfield)) { + /* We got a NULL value for a NOT NULL column. */ + *error = DB_INVALID_NULL; + return(NULL); + } + + /* Adjust the DATA_NOT_NULL flag in the parsed row. */ + dfield_get_type(dfield)->prtype = new_col->prtype; + + ut_ad(dict_col_type_assert_equal(new_col, + dfield_get_type(dfield))); + } + + return(row); +} + +/******************************************************//** +Replays an insert operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static __attribute__((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_insert_low( +/*===========================*/ + que_thr_t* thr, /*!< in: query graph */ + const dtuple_t* row, /*!< in: table row + in the old table definition */ + trx_id_t trx_id, /*!< in: trx_id of the row */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup) /*!< in/out: for reporting + duplicate key errors */ +{ + dberr_t error; + dtuple_t* entry; + const row_log_t*log = dup->index->online_log; + dict_index_t* index = dict_table_get_first_index(log->table); + + ut_ad(dtuple_validate(row)); + ut_ad(trx_id); + +#ifdef ROW_LOG_APPLY_PRINT + if (row_log_apply_print) { + fprintf(stderr, "table apply insert " + IB_ID_FMT " " IB_ID_FMT "\n", + index->table->id, index->id); + dtuple_print(stderr, row); + } +#endif /* ROW_LOG_APPLY_PRINT */ + + static const ulint flags + = (BTR_CREATE_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG + | BTR_KEEP_SYS_FLAG); + + entry = row_build_index_entry(row, NULL, index, heap); + + error = row_ins_clust_index_entry_low( + flags, BTR_MODIFY_TREE, index, index->n_uniq, entry, 0, thr); + + switch (error) { + case DB_SUCCESS: + break; + case DB_SUCCESS_LOCKED_REC: + /* The row had already been copied to the table. */ + return(DB_SUCCESS); + default: + return(error); + } + + do { + if (!(index = dict_table_get_next_index(index))) { + break; + } + + if (index->type & DICT_FTS) { + continue; + } + + entry = row_build_index_entry(row, NULL, index, heap); + error = row_ins_sec_index_entry_low( + flags, BTR_MODIFY_TREE, + index, offsets_heap, heap, entry, trx_id, thr); + } while (error == DB_SUCCESS); + + return(error); +} + +/******************************************************//** +Replays an insert operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static __attribute__((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_insert( +/*=======================*/ + que_thr_t* thr, /*!< in: query graph */ + const mrec_t* mrec, /*!< in: record to insert */ + const ulint* offsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup, /*!< in/out: for reporting + duplicate key errors */ + trx_id_t trx_id) /*!< in: DB_TRX_ID of mrec */ +{ + const row_log_t*log = dup->index->online_log; + dberr_t error; + const dtuple_t* row = row_log_table_apply_convert_mrec( + mrec, dup->index, offsets, log, heap, trx_id, &error); + + switch (error) { + case DB_MISSING_HISTORY: + ut_ad(log->blobs); + /* Because some BLOBs are missing, we know that the + transaction was rolled back later (a rollback of + an insert can free BLOBs). + We can simply skip the insert: the subsequent + ROW_T_DELETE will be ignored, or a ROW_T_UPDATE will + be interpreted as ROW_T_INSERT. */ + return(DB_SUCCESS); + case DB_SUCCESS: + ut_ad(row != NULL); + break; + default: + ut_ad(0); + case DB_INVALID_NULL: + ut_ad(row == NULL); + return(error); + } + + error = row_log_table_apply_insert_low( + thr, row, trx_id, offsets_heap, heap, dup); + if (error != DB_SUCCESS) { + /* Report the erroneous row using the new + version of the table. */ + innobase_row_to_mysql(dup->table, log->table, row); + } + return(error); +} + +/******************************************************//** +Deletes a record from a table that is being rebuilt. +@return DB_SUCCESS or error code */ +static __attribute__((nonnull(1, 2, 4, 5), warn_unused_result)) +dberr_t +row_log_table_apply_delete_low( +/*===========================*/ + btr_pcur_t* pcur, /*!< in/out: B-tree cursor, + will be trashed */ + const ulint* offsets, /*!< in: offsets on pcur */ + const row_ext_t* save_ext, /*!< in: saved external field + info, or NULL */ + mem_heap_t* heap, /*!< in/out: memory heap */ + mtr_t* mtr) /*!< in/out: mini-transaction, + will be committed */ +{ + dberr_t error; + row_ext_t* ext; + dtuple_t* row; + dict_index_t* index = btr_pcur_get_btr_cur(pcur)->index; + + ut_ad(dict_index_is_clust(index)); + +#ifdef ROW_LOG_APPLY_PRINT + if (row_log_apply_print) { + fprintf(stderr, "table apply delete " + IB_ID_FMT " " IB_ID_FMT "\n", + index->table->id, index->id); + rec_print_new(stderr, btr_pcur_get_rec(pcur), offsets); + } +#endif /* ROW_LOG_APPLY_PRINT */ + if (dict_table_get_next_index(index)) { + /* Build a row template for purging secondary index entries. */ + row = row_build( + ROW_COPY_DATA, index, btr_pcur_get_rec(pcur), + offsets, NULL, NULL, NULL, + save_ext ? NULL : &ext, heap); + if (!save_ext) { + save_ext = ext; + } + } else { + row = NULL; + } + + btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur), + BTR_CREATE_FLAG, RB_NONE, mtr); + mtr_commit(mtr); + + if (error != DB_SUCCESS) { + return(error); + } + + while ((index = dict_table_get_next_index(index)) != NULL) { + if (index->type & DICT_FTS) { + continue; + } + + const dtuple_t* entry = row_build_index_entry( + row, save_ext, index, heap); + mtr_start(mtr); + btr_pcur_open(index, entry, PAGE_CUR_LE, + BTR_MODIFY_TREE, pcur, mtr); +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + /* We did not request buffering. */ + break; + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + goto flag_ok; + } + ut_ad(0); +flag_ok: +#endif /* UNIV_DEBUG */ + + if (page_rec_is_infimum(btr_pcur_get_rec(pcur)) + || btr_pcur_get_low_match(pcur) < index->n_uniq) { + /* All secondary index entries should be + found, because new_table is being modified by + this thread only, and all indexes should be + updated in sync. */ + mtr_commit(mtr); + return(DB_INDEX_CORRUPT); + } + + btr_cur_pessimistic_delete(&error, FALSE, + btr_pcur_get_btr_cur(pcur), + BTR_CREATE_FLAG, RB_NONE, mtr); + mtr_commit(mtr); + } + + return(error); +} + +/******************************************************//** +Replays a delete operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static __attribute__((nonnull(1, 3, 4, 5, 6, 7), warn_unused_result)) +dberr_t +row_log_table_apply_delete( +/*=======================*/ + que_thr_t* thr, /*!< in: query graph */ + ulint trx_id_col, /*!< in: position of + DB_TRX_ID in the new + clustered index */ + const mrec_t* mrec, /*!< in: merge record */ + const ulint* moffsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + const row_log_t* log, /*!< in: online log */ + const row_ext_t* save_ext) /*!< in: saved external field + info, or NULL */ +{ + dict_table_t* new_table = log->table; + dict_index_t* index = dict_table_get_first_index(new_table); + dtuple_t* old_pk; + mtr_t mtr; + btr_pcur_t pcur; + ulint* offsets; + + ut_ad(rec_offs_n_fields(moffsets) + == dict_index_get_n_unique(index) + 2); + ut_ad(!rec_offs_any_extern(moffsets)); + + /* Convert the row to a search tuple. */ + old_pk = dtuple_create(heap, index->n_uniq); + dict_index_copy_types(old_pk, index, index->n_uniq); + + for (ulint i = 0; i < index->n_uniq; i++) { + ulint len; + const void* field; + field = rec_get_nth_field(mrec, moffsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + dfield_set_data(dtuple_get_nth_field(old_pk, i), + field, len); + } + + mtr_start(&mtr); + btr_pcur_open(index, old_pk, PAGE_CUR_LE, + BTR_MODIFY_TREE, &pcur, &mtr); +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(&pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + /* We did not request buffering. */ + break; + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + goto flag_ok; + } + ut_ad(0); +flag_ok: +#endif /* UNIV_DEBUG */ + + if (page_rec_is_infimum(btr_pcur_get_rec(&pcur)) + || btr_pcur_get_low_match(&pcur) < index->n_uniq) { +all_done: + mtr_commit(&mtr); + /* The record was not found. All done. */ + /* This should only happen when an earlier + ROW_T_INSERT was skipped or + ROW_T_UPDATE was interpreted as ROW_T_DELETE + due to BLOBs having been freed by rollback. */ + return(DB_SUCCESS); + } + + offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL, + ULINT_UNDEFINED, &offsets_heap); +#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG + ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets)); +#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */ + + /* Only remove the record if DB_TRX_ID,DB_ROLL_PTR match. */ + + { + ulint len; + const byte* mrec_trx_id + = rec_get_nth_field(mrec, moffsets, trx_id_col, &len); + ut_ad(len == DATA_TRX_ID_LEN); + const byte* rec_trx_id + = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets, + trx_id_col, &len); + ut_ad(len == DATA_TRX_ID_LEN); + + ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len) + == mrec_trx_id + DATA_TRX_ID_LEN); + ut_ad(len == DATA_ROLL_PTR_LEN); + ut_ad(rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets, + trx_id_col + 1, &len) + == rec_trx_id + DATA_TRX_ID_LEN); + ut_ad(len == DATA_ROLL_PTR_LEN); + + if (memcmp(mrec_trx_id, rec_trx_id, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) { + /* The ROW_T_DELETE was logged for a different + PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR. + This is possible if a ROW_T_INSERT was skipped + or a ROW_T_UPDATE was interpreted as ROW_T_DELETE + because some BLOBs were missing due to + (1) rolling back the initial insert, or + (2) purging the BLOB for a later ROW_T_DELETE + (3) purging 'old values' for a later ROW_T_UPDATE + or ROW_T_DELETE. */ + ut_ad(!log->same_pk); + goto all_done; + } + } + + return(row_log_table_apply_delete_low(&pcur, offsets, save_ext, + heap, &mtr)); +} + +/******************************************************//** +Replays an update operation on a table that was rebuilt. +@return DB_SUCCESS or error code */ +static __attribute__((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_update( +/*=======================*/ + que_thr_t* thr, /*!< in: query graph */ + ulint new_trx_id_col, /*!< in: position of + DB_TRX_ID in the new + clustered index */ + const mrec_t* mrec, /*!< in: new value */ + const ulint* offsets, /*!< in: offsets of mrec */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + row_merge_dup_t* dup, /*!< in/out: for reporting + duplicate key errors */ + trx_id_t trx_id, /*!< in: DB_TRX_ID of mrec */ + const dtuple_t* old_pk) /*!< in: PRIMARY KEY and + DB_TRX_ID,DB_ROLL_PTR + of the old value, + or PRIMARY KEY if same_pk */ +{ + const row_log_t*log = dup->index->online_log; + const dtuple_t* row; + dict_index_t* index = dict_table_get_first_index(log->table); + mtr_t mtr; + btr_pcur_t pcur; + dberr_t error; + + ut_ad(dtuple_get_n_fields_cmp(old_pk) + == dict_index_get_n_unique(index)); + ut_ad(dtuple_get_n_fields(old_pk) + == dict_index_get_n_unique(index) + + (log->same_pk ? 0 : 2)); + + row = row_log_table_apply_convert_mrec( + mrec, dup->index, offsets, log, heap, trx_id, &error); + + switch (error) { + case DB_MISSING_HISTORY: + /* The record contained BLOBs that are now missing. */ + ut_ad(log->blobs); + /* Whether or not we are updating the PRIMARY KEY, we + know that there should be a subsequent + ROW_T_DELETE for rolling back a preceding ROW_T_INSERT, + overriding this ROW_T_UPDATE record. (*1) + + This allows us to interpret this ROW_T_UPDATE + as ROW_T_DELETE. + + When applying the subsequent ROW_T_DELETE, no matching + record will be found. */ + case DB_SUCCESS: + ut_ad(row != NULL); + break; + default: + ut_ad(0); + case DB_INVALID_NULL: + ut_ad(row == NULL); + return(error); + } + + mtr_start(&mtr); + btr_pcur_open(index, old_pk, PAGE_CUR_LE, + BTR_MODIFY_TREE, &pcur, &mtr); +#ifdef UNIV_DEBUG + switch (btr_pcur_get_btr_cur(&pcur)->flag) { + case BTR_CUR_DELETE_REF: + case BTR_CUR_DEL_MARK_IBUF: + case BTR_CUR_DELETE_IBUF: + case BTR_CUR_INSERT_TO_IBUF: + ut_ad(0);/* We did not request buffering. */ + case BTR_CUR_HASH: + case BTR_CUR_HASH_FAIL: + case BTR_CUR_BINARY: + break; + } +#endif /* UNIV_DEBUG */ + + if (page_rec_is_infimum(btr_pcur_get_rec(&pcur)) + || btr_pcur_get_low_match(&pcur) < index->n_uniq) { + /* The record was not found. This should only happen + when an earlier ROW_T_INSERT or ROW_T_UPDATE was + diverted because BLOBs were freed when the insert was + later rolled back. */ + + ut_ad(log->blobs); + + if (error == DB_SUCCESS) { + /* An earlier ROW_T_INSERT could have been + skipped because of a missing BLOB, like this: + + BEGIN; + INSERT INTO t SET blob_col='blob value'; + UPDATE t SET blob_col=''; + ROLLBACK; + + This would generate the following records: + ROW_T_INSERT (referring to 'blob value') + ROW_T_UPDATE + ROW_T_UPDATE (referring to 'blob value') + ROW_T_DELETE + [ROLLBACK removes the 'blob value'] + + The ROW_T_INSERT would have been skipped + because of a missing BLOB. Now we are + executing the first ROW_T_UPDATE. + The second ROW_T_UPDATE (for the ROLLBACK) + would be interpreted as ROW_T_DELETE, because + the BLOB would be missing. + + We could probably assume that the transaction + has been rolled back and simply skip the + 'insert' part of this ROW_T_UPDATE record. + However, there might be some complex scenario + that could interfere with such a shortcut. + So, we will insert the row (and risk + introducing a bogus duplicate key error + for the ALTER TABLE), and a subsequent + ROW_T_UPDATE or ROW_T_DELETE will delete it. */ + mtr_commit(&mtr); + error = row_log_table_apply_insert_low( + thr, row, trx_id, offsets_heap, heap, dup); + } else { + /* Some BLOBs are missing, so we are interpreting + this ROW_T_UPDATE as ROW_T_DELETE (see *1). + Because the record was not found, we do nothing. */ + ut_ad(error == DB_MISSING_HISTORY); + error = DB_SUCCESS; +func_exit: + mtr_commit(&mtr); + } +func_exit_committed: + ut_ad(mtr.state == MTR_COMMITTED); + + if (error != DB_SUCCESS) { + /* Report the erroneous row using the new + version of the table. */ + innobase_row_to_mysql(dup->table, log->table, row); + } + + return(error); + } + + /* Prepare to update (or delete) the record. */ + ulint* cur_offsets = rec_get_offsets( + btr_pcur_get_rec(&pcur), + index, NULL, ULINT_UNDEFINED, &offsets_heap); + + if (!log->same_pk) { + /* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what + was buffered. */ + ulint len; + const void* rec_trx_id + = rec_get_nth_field(btr_pcur_get_rec(&pcur), + cur_offsets, index->n_uniq, &len); + ut_ad(len == DATA_TRX_ID_LEN); + ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq)->len + == DATA_TRX_ID_LEN); + ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq + 1)->len + == DATA_ROLL_PTR_LEN); + ut_ad(DATA_TRX_ID_LEN + static_cast<const char*>( + dtuple_get_nth_field(old_pk, + index->n_uniq)->data) + == dtuple_get_nth_field(old_pk, + index->n_uniq + 1)->data); + if (memcmp(rec_trx_id, + dtuple_get_nth_field(old_pk, index->n_uniq)->data, + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) { + /* The ROW_T_UPDATE was logged for a different + DB_TRX_ID,DB_ROLL_PTR. This is possible if an + earlier ROW_T_INSERT or ROW_T_UPDATE was diverted + because some BLOBs were missing due to rolling + back the initial insert or due to purging + the old BLOB values of an update. */ + ut_ad(log->blobs); + if (error != DB_SUCCESS) { + ut_ad(error == DB_MISSING_HISTORY); + /* Some BLOBs are missing, so we are + interpreting this ROW_T_UPDATE as + ROW_T_DELETE (see *1). + Because this is a different row, + we will do nothing. */ + error = DB_SUCCESS; + } else { + /* Because the user record is missing due to + BLOBs that were missing when processing + an earlier log record, we should + interpret the ROW_T_UPDATE as ROW_T_INSERT. + However, there is a different user record + with the same PRIMARY KEY value already. */ + error = DB_DUPLICATE_KEY; + } + + goto func_exit; + } + } + + if (error != DB_SUCCESS) { + ut_ad(error == DB_MISSING_HISTORY); + ut_ad(log->blobs); + /* Some BLOBs are missing, so we are interpreting + this ROW_T_UPDATE as ROW_T_DELETE (see *1). */ + error = row_log_table_apply_delete_low( + &pcur, cur_offsets, NULL, heap, &mtr); + goto func_exit_committed; + } + + dtuple_t* entry = row_build_index_entry( + row, NULL, index, heap); + const upd_t* update = row_upd_build_difference_binary( + index, entry, btr_pcur_get_rec(&pcur), cur_offsets, + false, NULL, heap); + + if (!update->n_fields) { + /* Nothing to do. */ + goto func_exit; + } + + const bool pk_updated + = upd_get_nth_field(update, 0)->field_no < new_trx_id_col; + + if (pk_updated || rec_offs_any_extern(cur_offsets)) { + /* If the record contains any externally stored + columns, perform the update by delete and insert, + because we will not write any undo log that would + allow purge to free any orphaned externally stored + columns. */ + + if (pk_updated && log->same_pk) { + /* The ROW_T_UPDATE log record should only be + written when the PRIMARY KEY fields of the + record did not change in the old table. We + can only get a change of PRIMARY KEY columns + in the rebuilt table if the PRIMARY KEY was + redefined (!same_pk). */ + ut_ad(0); + error = DB_CORRUPTION; + goto func_exit; + } + + error = row_log_table_apply_delete_low( + &pcur, cur_offsets, NULL, heap, &mtr); + ut_ad(mtr.state == MTR_COMMITTED); + + if (error == DB_SUCCESS) { + error = row_log_table_apply_insert_low( + thr, row, trx_id, offsets_heap, heap, dup); + } + + goto func_exit_committed; + } + + dtuple_t* old_row; + row_ext_t* old_ext; + + if (dict_table_get_next_index(index)) { + /* Construct the row corresponding to the old value of + the record. */ + old_row = row_build( + ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur), + cur_offsets, NULL, NULL, NULL, &old_ext, heap); + ut_ad(old_row); +#ifdef ROW_LOG_APPLY_PRINT + if (row_log_apply_print) { + fprintf(stderr, "table apply update " + IB_ID_FMT " " IB_ID_FMT "\n", + index->table->id, index->id); + dtuple_print(stderr, old_row); + dtuple_print(stderr, row); + } +#endif /* ROW_LOG_APPLY_PRINT */ + } else { + old_row = NULL; + old_ext = NULL; + } + + big_rec_t* big_rec; + + error = btr_cur_pessimistic_update( + BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG + | BTR_KEEP_POS_FLAG, + btr_pcur_get_btr_cur(&pcur), + &cur_offsets, &offsets_heap, heap, &big_rec, + update, 0, thr, 0, &mtr); + + if (big_rec) { + if (error == DB_SUCCESS) { + error = btr_store_big_rec_extern_fields( + index, btr_pcur_get_block(&pcur), + btr_pcur_get_rec(&pcur), cur_offsets, + big_rec, &mtr, BTR_STORE_UPDATE); + } + + dtuple_big_rec_free(big_rec); + } + + while ((index = dict_table_get_next_index(index)) != NULL) { + if (error != DB_SUCCESS) { + break; + } + + if (index->type & DICT_FTS) { + continue; + } + + if (!row_upd_changes_ord_field_binary( + index, update, thr, old_row, NULL)) { + continue; + } + + mtr_commit(&mtr); + + entry = row_build_index_entry(old_row, old_ext, index, heap); + if (!entry) { + ut_ad(0); + return(DB_CORRUPTION); + } + + mtr_start(&mtr); + + if (ROW_FOUND != row_search_index_entry( + index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) { + ut_ad(0); + error = DB_CORRUPTION; + break; + } + + btr_cur_pessimistic_delete( + &error, FALSE, btr_pcur_get_btr_cur(&pcur), + BTR_CREATE_FLAG, RB_NONE, &mtr); + + if (error != DB_SUCCESS) { + break; + } + + mtr_commit(&mtr); + + entry = row_build_index_entry(row, NULL, index, heap); + error = row_ins_sec_index_entry_low( + BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG + | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG, + BTR_MODIFY_TREE, index, offsets_heap, heap, + entry, trx_id, thr); + + mtr_start(&mtr); + } + + goto func_exit; +} + +/******************************************************//** +Applies an operation to a table that was rebuilt. +@return NULL on failure (mrec corruption) or when out of data; +pointer to next record on success */ +static __attribute__((nonnull, warn_unused_result)) +const mrec_t* +row_log_table_apply_op( +/*===================*/ + que_thr_t* thr, /*!< in: query graph */ + ulint trx_id_col, /*!< in: position of + DB_TRX_ID in old index */ + ulint new_trx_id_col, /*!< in: position of + DB_TRX_ID in new index */ + row_merge_dup_t* dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS + or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap + that can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap */ + const mrec_t* mrec, /*!< in: merge record */ + const mrec_t* mrec_end, /*!< in: end of buffer */ + ulint* offsets) /*!< in/out: work area + for parsing mrec */ +{ + row_log_t* log = dup->index->online_log; + dict_index_t* new_index = dict_table_get_first_index(log->table); + ulint extra_size; + const mrec_t* next_mrec; + dtuple_t* old_pk; + row_ext_t* ext; + ulint ext_size; + + ut_ad(dict_index_is_clust(dup->index)); + ut_ad(dup->index->table != log->table); + ut_ad(log->head.total <= log->tail.total); + + *error = DB_SUCCESS; + + /* 3 = 1 (op type) + 1 (ext_size) + at least 1 byte payload */ + if (mrec + 3 >= mrec_end) { + return(NULL); + } + + const mrec_t* const mrec_start = mrec; + + switch (*mrec++) { + default: + ut_ad(0); + *error = DB_CORRUPTION; + return(NULL); + case ROW_T_INSERT: + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } else { + log->head.total += next_mrec - mrec_start; + + ulint len; + const byte* db_trx_id + = rec_get_nth_field( + mrec, offsets, trx_id_col, &len); + ut_ad(len == DATA_TRX_ID_LEN); + *error = row_log_table_apply_insert( + thr, mrec, offsets, offsets_heap, + heap, dup, trx_read_trx_id(db_trx_id)); + } + break; + + case ROW_T_DELETE: + /* 1 (extra_size) + 2 (ext_size) + at least 1 (payload) */ + if (mrec + 4 >= mrec_end) { + return(NULL); + } + + extra_size = *mrec++; + ext_size = mach_read_from_2(mrec); + mrec += 2; + ut_ad(mrec < mrec_end); + + /* We assume extra_size < 0x100 for the PRIMARY KEY prefix. + For fixed-length PRIMARY key columns, it is 0. */ + mrec += extra_size; + + rec_offs_set_n_fields(offsets, new_index->n_uniq + 2); + rec_init_offsets_temp(mrec, new_index, offsets); + next_mrec = mrec + rec_offs_data_size(offsets) + ext_size; + if (next_mrec > mrec_end) { + return(NULL); + } + + log->head.total += next_mrec - mrec_start; + + /* If there are external fields, retrieve those logged + prefix info and reconstruct the row_ext_t */ + if (ext_size) { + /* We use memcpy to avoid unaligned + access on some non-x86 platforms.*/ + ext = static_cast<row_ext_t*>( + mem_heap_dup(heap, + mrec + rec_offs_data_size(offsets), + ext_size)); + + byte* ext_start = reinterpret_cast<byte*>(ext); + + ulint ext_len = sizeof(*ext) + + (ext->n_ext - 1) * sizeof ext->len; + + ext->ext = reinterpret_cast<ulint*>(ext_start + ext_len); + ext_len += ext->n_ext * sizeof(*ext->ext); + + ext->buf = static_cast<byte*>(ext_start + ext_len); + } else { + ext = NULL; + } + + *error = row_log_table_apply_delete( + thr, new_trx_id_col, + mrec, offsets, offsets_heap, heap, + log, ext); + break; + + case ROW_T_UPDATE: + /* Logically, the log entry consists of the + (PRIMARY KEY,DB_TRX_ID) of the old value (converted + to the new primary key definition) followed by + the new value in the old table definition. If the + definition of the columns belonging to PRIMARY KEY + is not changed, the log will only contain + DB_TRX_ID,new_row. */ + + if (dup->index->online_log->same_pk) { + ut_ad(new_index->n_uniq == dup->index->n_uniq); + + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } + + old_pk = dtuple_create(heap, new_index->n_uniq); + dict_index_copy_types( + old_pk, new_index, old_pk->n_fields); + + /* Copy the PRIMARY KEY fields from mrec to old_pk. */ + for (ulint i = 0; i < new_index->n_uniq; i++) { + const void* field; + ulint len; + dfield_t* dfield; + + ut_ad(!rec_offs_nth_extern(offsets, i)); + + field = rec_get_nth_field( + mrec, offsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + + dfield = dtuple_get_nth_field(old_pk, i); + dfield_set_data(dfield, field, len); + } + } else { + /* We assume extra_size < 0x100 + for the PRIMARY KEY prefix. */ + mrec += *mrec + 1; + + if (mrec > mrec_end) { + return(NULL); + } + + /* Get offsets for PRIMARY KEY, + DB_TRX_ID, DB_ROLL_PTR. */ + rec_offs_set_n_fields(offsets, new_index->n_uniq + 2); + rec_init_offsets_temp(mrec, new_index, offsets); + + next_mrec = mrec + rec_offs_data_size(offsets); + if (next_mrec + 2 > mrec_end) { + return(NULL); + } + + /* Copy the PRIMARY KEY fields and + DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */ + old_pk = dtuple_create(heap, new_index->n_uniq + 2); + dict_index_copy_types(old_pk, new_index, + old_pk->n_fields); + + for (ulint i = 0; + i < dict_index_get_n_unique(new_index) + 2; + i++) { + const void* field; + ulint len; + dfield_t* dfield; + + ut_ad(!rec_offs_nth_extern(offsets, i)); + + field = rec_get_nth_field( + mrec, offsets, i, &len); + ut_ad(len != UNIV_SQL_NULL); + + dfield = dtuple_get_nth_field(old_pk, i); + dfield_set_data(dfield, field, len); + } + + mrec = next_mrec; + + /* Fetch the new value of the row as it was + in the old table definition. */ + extra_size = *mrec++; + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + if (mrec > mrec_end) { + return(NULL); + } + + rec_offs_set_n_fields(offsets, dup->index->n_fields); + rec_init_offsets_temp(mrec, dup->index, offsets); + + next_mrec = mrec + rec_offs_data_size(offsets); + + if (next_mrec > mrec_end) { + return(NULL); + } + } + + ut_ad(next_mrec <= mrec_end); + log->head.total += next_mrec - mrec_start; + dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq); + + { + ulint len; + const byte* db_trx_id + = rec_get_nth_field( + mrec, offsets, trx_id_col, &len); + ut_ad(len == DATA_TRX_ID_LEN); + *error = row_log_table_apply_update( + thr, new_trx_id_col, + mrec, offsets, offsets_heap, + heap, dup, trx_read_trx_id(db_trx_id), old_pk); + } + + break; + } + + ut_ad(log->head.total <= log->tail.total); + mem_heap_empty(offsets_heap); + mem_heap_empty(heap); + return(next_mrec); +} + +/******************************************************//** +Applies operations to a table was rebuilt. +@return DB_SUCCESS, or error code on failure */ +static __attribute__((nonnull, warn_unused_result)) +dberr_t +row_log_table_apply_ops( +/*====================*/ + que_thr_t* thr, /*!< in: query graph */ + row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key + errors */ +{ + dberr_t error; + const mrec_t* mrec = NULL; + const mrec_t* next_mrec; + const mrec_t* mrec_end = NULL; /* silence bogus warning */ + const mrec_t* next_mrec_end; + mem_heap_t* heap; + mem_heap_t* offsets_heap; + ulint* offsets; + bool has_index_lock; + dict_index_t* index = const_cast<dict_index_t*>( + dup->index); + dict_table_t* new_table = index->online_log->table; + dict_index_t* new_index = dict_table_get_first_index( + new_table); + const ulint i = 1 + REC_OFFS_HEADER_SIZE + + ut_max(dict_index_get_n_fields(index), + dict_index_get_n_unique(new_index) + 2); + const ulint trx_id_col = dict_col_get_clust_pos( + dict_table_get_sys_col(index->table, DATA_TRX_ID), index); + const ulint new_trx_id_col = dict_col_get_clust_pos( + dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index); + trx_t* trx = thr_get_trx(thr); + + ut_ad(dict_index_is_clust(index)); + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(trx->mysql_thd); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(!dict_index_is_online_ddl(new_index)); + ut_ad(trx_id_col > 0); + ut_ad(trx_id_col != ULINT_UNDEFINED); + ut_ad(new_trx_id_col > 0); + ut_ad(new_trx_id_col != ULINT_UNDEFINED); + + UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end); + + offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets)); + offsets[0] = i; + offsets[1] = dict_index_get_n_fields(index); + + heap = mem_heap_create(UNIV_PAGE_SIZE); + offsets_heap = mem_heap_create(UNIV_PAGE_SIZE); + has_index_lock = true; + +next_block: + ut_ad(has_index_lock); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(index->online_log->head.bytes == 0); + + if (trx_is_interrupted(trx)) { + goto interrupted; + } + + if (dict_index_is_corrupted(index)) { + error = DB_INDEX_CORRUPT; + goto func_exit; + } + + ut_ad(dict_index_is_online_ddl(index)); + + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (UNIV_UNLIKELY(index->online_log->head.blocks + > index->online_log->tail.blocks)) { +unexpected_eof: + fprintf(stderr, "InnoDB: unexpected end of temporary file" + " for table %s\n", index->table_name); +corruption: + error = DB_CORRUPTION; + goto func_exit; + } + + if (index->online_log->head.blocks + == index->online_log->tail.blocks) { + if (index->online_log->head.blocks) { +#ifdef HAVE_FTRUNCATE + /* Truncate the file in order to save space. */ + if (ftruncate(index->online_log->fd, 0) == -1) { + perror("ftruncate"); + } +#endif /* HAVE_FTRUNCATE */ + index->online_log->head.blocks + = index->online_log->tail.blocks = 0; + } + + next_mrec = index->online_log->tail.block; + next_mrec_end = next_mrec + index->online_log->tail.bytes; + + if (next_mrec_end == next_mrec) { + /* End of log reached. */ +all_done: + ut_ad(has_index_lock); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + index->online_log->head.bytes = 0; + index->online_log->tail.bytes = 0; + error = DB_SUCCESS; + goto func_exit; + } + } else { + os_offset_t ofs; + ibool success; + + ofs = (os_offset_t) index->online_log->head.blocks + * srv_sort_buf_size; + + ut_ad(has_index_lock); + has_index_lock = false; + rw_lock_x_unlock(dict_index_get_lock(index)); + + log_free_check(); + + ut_ad(dict_index_is_online_ddl(index)); + + if (!row_log_block_allocate(index->online_log->head)) { + error = DB_OUT_OF_MEMORY; + goto func_exit; + } + + success = os_file_read_no_error_handling( + OS_FILE_FROM_FD(index->online_log->fd), + index->online_log->head.block, ofs, + srv_sort_buf_size); + + if (!success) { + fprintf(stderr, "InnoDB: unable to read temporary file" + " for table %s\n", index->table_name); + goto corruption; + } + +#ifdef POSIX_FADV_DONTNEED + /* Each block is read exactly once. Free up the file cache. */ + posix_fadvise(index->online_log->fd, + ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); +#endif /* POSIX_FADV_DONTNEED */ +#if 0 //def FALLOC_FL_PUNCH_HOLE + /* Try to deallocate the space for the file on disk. + This should work on ext4 on Linux 2.6.39 and later, + and be ignored when the operation is unsupported. */ + fallocate(index->online_log->fd, + FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, + ofs, srv_buf_size); +#endif /* FALLOC_FL_PUNCH_HOLE */ + + next_mrec = index->online_log->head.block; + next_mrec_end = next_mrec + srv_sort_buf_size; + } + + /* This read is not protected by index->online_log->mutex for + performance reasons. We will eventually notice any error that + was flagged by a DML thread. */ + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (mrec) { + /* A partial record was read from the previous block. + Copy the temporary buffer full, as we do not know the + length of the record. Parse subsequent records from + the bigger buffer index->online_log->head.block + or index->online_log->tail.block. */ + + ut_ad(mrec == index->online_log->head.buf); + ut_ad(mrec_end > mrec); + ut_ad(mrec_end < (&index->online_log->head.buf)[1]); + + memcpy((mrec_t*) mrec_end, next_mrec, + (&index->online_log->head.buf)[1] - mrec_end); + mrec = row_log_table_apply_op( + thr, trx_id_col, new_trx_id_col, + dup, &error, offsets_heap, heap, + index->online_log->head.buf, + (&index->online_log->head.buf)[1], offsets); + if (error != DB_SUCCESS) { + goto func_exit; + } else if (UNIV_UNLIKELY(mrec == NULL)) { + /* The record was not reassembled properly. */ + goto corruption; + } + /* The record was previously found out to be + truncated. Now that the parse buffer was extended, + it should proceed beyond the old end of the buffer. */ + ut_a(mrec > mrec_end); + + index->online_log->head.bytes = mrec - mrec_end; + next_mrec += index->online_log->head.bytes; + } + + ut_ad(next_mrec <= next_mrec_end); + /* The following loop must not be parsing the temporary + buffer, but head.block or tail.block. */ + + /* mrec!=NULL means that the next record starts from the + middle of the block */ + ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0)); + +#ifdef UNIV_DEBUG + if (next_mrec_end == index->online_log->head.block + + srv_sort_buf_size) { + /* If tail.bytes == 0, next_mrec_end can also be at + the end of tail.block. */ + if (index->online_log->tail.bytes == 0) { + ut_ad(next_mrec == next_mrec_end); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes == 0); + } else { + ut_ad(next_mrec == index->online_log->head.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks + > index->online_log->head.blocks); + } + } else if (next_mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes) { + ut_ad(next_mrec == index->online_log->tail.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes + <= index->online_log->tail.bytes); + } else { + ut_error; + } +#endif /* UNIV_DEBUG */ + + mrec_end = next_mrec_end; + + while (!trx_is_interrupted(trx)) { + mrec = next_mrec; + ut_ad(mrec < mrec_end); + + if (!has_index_lock) { + /* We are applying operations from a different + block than the one that is being written to. + We do not hold index->lock in order to + allow other threads to concurrently buffer + modifications. */ + ut_ad(mrec >= index->online_log->head.block); + ut_ad(mrec_end == index->online_log->head.block + + srv_sort_buf_size); + ut_ad(index->online_log->head.bytes + < srv_sort_buf_size); + + /* Take the opportunity to do a redo log + checkpoint if needed. */ + log_free_check(); + } else { + /* We are applying operations from the last block. + Do not allow other threads to buffer anything, + so that we can finally catch up and synchronize. */ + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(mrec >= index->online_log->tail.block); + } + + /* This read is not protected by index->online_log->mutex + for performance reasons. We will eventually notice any + error that was flagged by a DML thread. */ + error = index->online_log->error; + + if (error != DB_SUCCESS) { + goto func_exit; + } + + next_mrec = row_log_table_apply_op( + thr, trx_id_col, new_trx_id_col, + dup, &error, offsets_heap, heap, + mrec, mrec_end, offsets); + + if (error != DB_SUCCESS) { + goto func_exit; + } else if (next_mrec == next_mrec_end) { + /* The record happened to end on a block boundary. + Do we have more blocks left? */ + if (has_index_lock) { + /* The index will be locked while + applying the last block. */ + goto all_done; + } + + mrec = NULL; +process_next_block: + rw_lock_x_lock(dict_index_get_lock(index)); + has_index_lock = true; + + index->online_log->head.bytes = 0; + index->online_log->head.blocks++; + goto next_block; + } else if (next_mrec != NULL) { + ut_ad(next_mrec < next_mrec_end); + index->online_log->head.bytes += next_mrec - mrec; + } else if (has_index_lock) { + /* When mrec is within tail.block, it should + be a complete record, because we are holding + index->lock and thus excluding the writer. */ + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(0); + goto unexpected_eof; + } else { + memcpy(index->online_log->head.buf, mrec, + mrec_end - mrec); + mrec_end += index->online_log->head.buf - mrec; + mrec = index->online_log->head.buf; + goto process_next_block; + } + } + +interrupted: + error = DB_INTERRUPTED; +func_exit: + if (!has_index_lock) { + rw_lock_x_lock(dict_index_get_lock(index)); + } + + mem_heap_free(offsets_heap); + mem_heap_free(heap); + row_log_block_free(index->online_log->head); + ut_free(offsets); + return(error); +} + +/******************************************************//** +Apply the row_log_table log to a table upon completing rebuild. +@return DB_SUCCESS, or error code on failure */ +UNIV_INTERN +dberr_t +row_log_table_apply( +/*================*/ + que_thr_t* thr, /*!< in: query graph */ + dict_table_t* old_table, + /*!< in: old table */ + struct TABLE* table) /*!< in/out: MySQL table + (for reporting duplicates) */ +{ + dberr_t error; + dict_index_t* clust_index; + + thr_get_trx(thr)->error_key_num = 0; + +#ifdef UNIV_SYNC_DEBUG + ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED)); +#endif /* UNIV_SYNC_DEBUG */ + clust_index = dict_table_get_first_index(old_table); + + rw_lock_x_lock(dict_index_get_lock(clust_index)); + + if (!clust_index->online_log) { + ut_ad(dict_index_get_online_status(clust_index) + == ONLINE_INDEX_COMPLETE); + /* This function should not be called unless + rebuilding a table online. Build in some fault + tolerance. */ + ut_ad(0); + error = DB_ERROR; + } else { + row_merge_dup_t dup = { + clust_index, table, + clust_index->online_log->col_map, 0 + }; + + error = row_log_table_apply_ops(thr, &dup); + + ut_ad(error != DB_SUCCESS + || clust_index->online_log->head.total + == clust_index->online_log->tail.total); + } + + rw_lock_x_unlock(dict_index_get_lock(clust_index)); + return(error); +} + +/******************************************************//** +Allocate the row log for an index and flag the index +for online creation. +@retval true if success, false if not */ +UNIV_INTERN +bool +row_log_allocate( +/*=============*/ + dict_index_t* index, /*!< in/out: index */ + dict_table_t* table, /*!< in/out: new table being rebuilt, + or NULL when creating a secondary index */ + bool same_pk,/*!< in: whether the definition of the + PRIMARY KEY has remained the same */ + const dtuple_t* add_cols, + /*!< in: default values of + added columns, or NULL */ + const ulint* col_map)/*!< in: mapping of old column + numbers to new ones, or NULL if !table */ +{ + row_log_t* log; + DBUG_ENTER("row_log_allocate"); + + ut_ad(!dict_index_is_online_ddl(index)); + ut_ad(dict_index_is_clust(index) == !!table); + ut_ad(!table || index->table != table); + ut_ad(same_pk || table); + ut_ad(!table || col_map); + ut_ad(!add_cols || col_map); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + log = (row_log_t*) ut_malloc(sizeof *log); + if (!log) { + DBUG_RETURN(false); + } + + log->fd = row_merge_file_create_low(); + if (log->fd < 0) { + ut_free(log); + DBUG_RETURN(false); + } + mutex_create(index_online_log_key, &log->mutex, + SYNC_INDEX_ONLINE_LOG); + log->blobs = NULL; + log->table = table; + log->same_pk = same_pk; + log->add_cols = add_cols; + log->col_map = col_map; + log->error = DB_SUCCESS; + log->max_trx = 0; + log->tail.blocks = log->tail.bytes = 0; + log->tail.total = 0; + log->tail.block = log->head.block = NULL; + log->head.blocks = log->head.bytes = 0; + log->head.total = 0; + dict_index_set_online_status(index, ONLINE_INDEX_CREATION); + index->online_log = log; + + /* While we might be holding an exclusive data dictionary lock + here, in row_log_abort_sec() we will not always be holding it. Use + atomic operations in both cases. */ + MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX); + + DBUG_RETURN(true); +} + +/******************************************************//** +Free the row log for an index that was being created online. */ +UNIV_INTERN +void +row_log_free( +/*=========*/ + row_log_t*& log) /*!< in,own: row log */ +{ + MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX); + + delete log->blobs; + row_log_block_free(log->tail); + row_log_block_free(log->head); + row_merge_file_destroy_low(log->fd); + mutex_free(&log->mutex); + ut_free(log); + log = 0; +} + +/******************************************************//** +Get the latest transaction ID that has invoked row_log_online_op() +during online creation. +@return latest transaction ID, or 0 if nothing was logged */ +UNIV_INTERN +trx_id_t +row_log_get_max_trx( +/*================*/ + dict_index_t* index) /*!< in: index, must be locked */ +{ + ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION); +#ifdef UNIV_SYNC_DEBUG + ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED) + && mutex_own(&index->online_log->mutex)) + || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + return(index->online_log->max_trx); +} + +/******************************************************//** +Applies an operation to a secondary index that was being created. */ +static __attribute__((nonnull)) +void +row_log_apply_op_low( +/*=================*/ + dict_index_t* index, /*!< in/out: index */ + row_merge_dup_t*dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap for + allocating offsets; can be emptied */ + bool has_index_lock, /*!< in: true if holding index->lock + in exclusive mode */ + enum row_op op, /*!< in: operation being applied */ + trx_id_t trx_id, /*!< in: transaction identifier */ + const dtuple_t* entry) /*!< in: row */ +{ + mtr_t mtr; + btr_cur_t cursor; + ulint* offsets = NULL; + + ut_ad(!dict_index_is_clust(index)); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX) + == has_index_lock); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(!dict_index_is_corrupted(index)); + ut_ad(trx_id != 0 || op == ROW_OP_DELETE); + + mtr_start(&mtr); + + /* We perform the pessimistic variant of the operations if we + already hold index->lock exclusively. First, search the + record. The operation may already have been performed, + depending on when the row in the clustered index was + scanned. */ + btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, + has_index_lock + ? BTR_MODIFY_TREE + : BTR_MODIFY_LEAF, + &cursor, 0, __FILE__, __LINE__, + &mtr); + + ut_ad(dict_index_get_n_unique(index) > 0); + /* This test is somewhat similar to row_ins_must_modify_rec(), + but not identical for unique secondary indexes. */ + if (cursor.low_match >= dict_index_get_n_unique(index) + && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) { + /* We have a matching record. */ + bool exists = (cursor.low_match + == dict_index_get_n_fields(index)); +#ifdef UNIV_DEBUG + rec_t* rec = btr_cur_get_rec(&cursor); + ut_ad(page_rec_is_user_rec(rec)); + ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec))); +#endif /* UNIV_DEBUG */ + + ut_ad(exists || dict_index_is_unique(index)); + + switch (op) { + case ROW_OP_DELETE: + if (!exists) { + /* The existing record matches the + unique secondary index key, but the + PRIMARY KEY columns differ. So, this + exact record does not exist. For + example, we could detect a duplicate + key error in some old index before + logging an ROW_OP_INSERT for our + index. This ROW_OP_DELETE could have + been logged for rolling back + TRX_UNDO_INSERT_REC. */ + goto func_exit; + } + + if (btr_cur_optimistic_delete( + &cursor, BTR_CREATE_FLAG, &mtr)) { + *error = DB_SUCCESS; + break; + } + + if (!has_index_lock) { + /* This needs a pessimistic operation. + Lock the index tree exclusively. */ + mtr_commit(&mtr); + mtr_start(&mtr); + btr_cur_search_to_nth_level( + index, 0, entry, PAGE_CUR_LE, + BTR_MODIFY_TREE, &cursor, 0, + __FILE__, __LINE__, &mtr); + + /* No other thread than the current one + is allowed to modify the index tree. + Thus, the record should still exist. */ + ut_ad(cursor.low_match + >= dict_index_get_n_fields(index)); + ut_ad(page_rec_is_user_rec( + btr_cur_get_rec(&cursor))); + } + + /* As there are no externally stored fields in + a secondary index record, the parameter + rb_ctx = RB_NONE will be ignored. */ + + btr_cur_pessimistic_delete( + error, FALSE, &cursor, + BTR_CREATE_FLAG, RB_NONE, &mtr); + break; + case ROW_OP_INSERT: + if (exists) { + /* The record already exists. There + is nothing to be inserted. + This could happen when processing + TRX_UNDO_DEL_MARK_REC in statement + rollback: + + UPDATE of PRIMARY KEY can lead to + statement rollback if the updated + value of the PRIMARY KEY already + exists. In this case, the UPDATE would + be mapped to DELETE;INSERT, and we + only wrote undo log for the DELETE + part. The duplicate key error would be + triggered before logging the INSERT + part. + + Theoretically, we could also get a + similar situation when a DELETE operation + is blocked by a FOREIGN KEY constraint. */ + goto func_exit; + } + + if (dtuple_contains_null(entry)) { + /* The UNIQUE KEY columns match, but + there is a NULL value in the key, and + NULL!=NULL. */ + goto insert_the_rec; + } + + goto duplicate; + } + } else { + switch (op) { + rec_t* rec; + big_rec_t* big_rec; + case ROW_OP_DELETE: + /* The record does not exist. For example, we + could detect a duplicate key error in some old + index before logging an ROW_OP_INSERT for our + index. This ROW_OP_DELETE could be logged for + rolling back TRX_UNDO_INSERT_REC. */ + goto func_exit; + case ROW_OP_INSERT: + if (dict_index_is_unique(index) + && (cursor.up_match + >= dict_index_get_n_unique(index) + || cursor.low_match + >= dict_index_get_n_unique(index)) + && (!index->n_nullable + || !dtuple_contains_null(entry))) { +duplicate: + /* Duplicate key */ + ut_ad(dict_index_is_unique(index)); + row_merge_dup_report(dup, entry->fields); + *error = DB_DUPLICATE_KEY; + goto func_exit; + } +insert_the_rec: + /* Insert the record. As we are inserting into + a secondary index, there cannot be externally + stored columns (!big_rec). */ + *error = btr_cur_optimistic_insert( + BTR_NO_UNDO_LOG_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_CREATE_FLAG, + &cursor, &offsets, &offsets_heap, + const_cast<dtuple_t*>(entry), + &rec, &big_rec, 0, NULL, &mtr); + ut_ad(!big_rec); + if (*error != DB_FAIL) { + break; + } + + if (!has_index_lock) { + /* This needs a pessimistic operation. + Lock the index tree exclusively. */ + mtr_commit(&mtr); + mtr_start(&mtr); + btr_cur_search_to_nth_level( + index, 0, entry, PAGE_CUR_LE, + BTR_MODIFY_TREE, &cursor, 0, + __FILE__, __LINE__, &mtr); + } + + /* We already determined that the + record did not exist. No other thread + than the current one is allowed to + modify the index tree. Thus, the + record should still not exist. */ + + *error = btr_cur_pessimistic_insert( + BTR_NO_UNDO_LOG_FLAG + | BTR_NO_LOCKING_FLAG + | BTR_CREATE_FLAG, + &cursor, &offsets, &offsets_heap, + const_cast<dtuple_t*>(entry), + &rec, &big_rec, + 0, NULL, &mtr); + ut_ad(!big_rec); + break; + } + mem_heap_empty(offsets_heap); + } + + if (*error == DB_SUCCESS && trx_id) { + page_update_max_trx_id(btr_cur_get_block(&cursor), + btr_cur_get_page_zip(&cursor), + trx_id, &mtr); + } + +func_exit: + mtr_commit(&mtr); +} + +/******************************************************//** +Applies an operation to a secondary index that was being created. +@return NULL on failure (mrec corruption) or when out of data; +pointer to next record on success */ +static __attribute__((nonnull, warn_unused_result)) +const mrec_t* +row_log_apply_op( +/*=============*/ + dict_index_t* index, /*!< in/out: index */ + row_merge_dup_t*dup, /*!< in/out: for reporting + duplicate key errors */ + dberr_t* error, /*!< out: DB_SUCCESS or error code */ + mem_heap_t* offsets_heap, /*!< in/out: memory heap for + allocating offsets; can be emptied */ + mem_heap_t* heap, /*!< in/out: memory heap for + allocating data tuples */ + bool has_index_lock, /*!< in: true if holding index->lock + in exclusive mode */ + const mrec_t* mrec, /*!< in: merge record */ + const mrec_t* mrec_end, /*!< in: end of buffer */ + ulint* offsets) /*!< in/out: work area for + rec_init_offsets_temp() */ + +{ + enum row_op op; + ulint extra_size; + ulint data_size; + ulint n_ext; + dtuple_t* entry; + trx_id_t trx_id; + + /* Online index creation is only used for secondary indexes. */ + ut_ad(!dict_index_is_clust(index)); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX) + == has_index_lock); +#endif /* UNIV_SYNC_DEBUG */ + + if (dict_index_is_corrupted(index)) { + *error = DB_INDEX_CORRUPT; + return(NULL); + } + + *error = DB_SUCCESS; + + if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) { + return(NULL); + } + + switch (*mrec) { + case ROW_OP_INSERT: + if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) { + return(NULL); + } + + op = static_cast<enum row_op>(*mrec++); + trx_id = trx_read_trx_id(mrec); + mrec += DATA_TRX_ID_LEN; + break; + case ROW_OP_DELETE: + op = static_cast<enum row_op>(*mrec++); + trx_id = 0; + break; + default: +corrupted: + ut_ad(0); + *error = DB_CORRUPTION; + return(NULL); + } + + extra_size = *mrec++; + + ut_ad(mrec < mrec_end); + + if (extra_size >= 0x80) { + /* Read another byte of extra_size. */ + + extra_size = (extra_size & 0x7f) << 8; + extra_size |= *mrec++; + } + + mrec += extra_size; + + if (mrec > mrec_end) { + return(NULL); + } + + rec_init_offsets_temp(mrec, index, offsets); + + if (rec_offs_any_extern(offsets)) { + /* There should never be any externally stored fields + in a secondary index, which is what online index + creation is used for. Therefore, the log file must be + corrupted. */ + goto corrupted; + } + + data_size = rec_offs_data_size(offsets); + + mrec += data_size; + + if (mrec > mrec_end) { + return(NULL); + } + + entry = row_rec_to_index_entry_low( + mrec - data_size, index, offsets, &n_ext, heap); + /* Online index creation is only implemented for secondary + indexes, which never contain off-page columns. */ + ut_ad(n_ext == 0); +#ifdef ROW_LOG_APPLY_PRINT + if (row_log_apply_print) { + fprintf(stderr, "apply " IB_ID_FMT " " TRX_ID_FMT " %u %u ", + index->id, trx_id, + unsigned (op), unsigned (has_index_lock)); + for (const byte* m = mrec - data_size; m < mrec; m++) { + fprintf(stderr, "%02x", *m); + } + putc('\n', stderr); + } +#endif /* ROW_LOG_APPLY_PRINT */ + row_log_apply_op_low(index, dup, error, offsets_heap, + has_index_lock, op, trx_id, entry); + return(mrec); +} + +/******************************************************//** +Applies operations to a secondary index that was being created. +@return DB_SUCCESS, or error code on failure */ +static __attribute__((nonnull)) +dberr_t +row_log_apply_ops( +/*==============*/ + trx_t* trx, /*!< in: transaction (for checking if + the operation was interrupted) */ + dict_index_t* index, /*!< in/out: index */ + row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key + errors */ +{ + dberr_t error; + const mrec_t* mrec = NULL; + const mrec_t* next_mrec; + const mrec_t* mrec_end= NULL; /* silence bogus warning */ + const mrec_t* next_mrec_end; + mem_heap_t* offsets_heap; + mem_heap_t* heap; + ulint* offsets; + bool has_index_lock; + const ulint i = 1 + REC_OFFS_HEADER_SIZE + + dict_index_get_n_fields(index); + + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(*index->name == TEMP_INDEX_PREFIX); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(index->online_log); + UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end); + + offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets)); + offsets[0] = i; + offsets[1] = dict_index_get_n_fields(index); + + offsets_heap = mem_heap_create(UNIV_PAGE_SIZE); + heap = mem_heap_create(UNIV_PAGE_SIZE); + has_index_lock = true; + +next_block: + ut_ad(has_index_lock); +#ifdef UNIV_SYNC_DEBUG + ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(index->online_log->head.bytes == 0); + + if (trx_is_interrupted(trx)) { + goto interrupted; + } + + error = index->online_log->error; + if (error != DB_SUCCESS) { + goto func_exit; + } + + if (dict_index_is_corrupted(index)) { + error = DB_INDEX_CORRUPT; + goto func_exit; + } + + if (UNIV_UNLIKELY(index->online_log->head.blocks + > index->online_log->tail.blocks)) { +unexpected_eof: + fprintf(stderr, "InnoDB: unexpected end of temporary file" + " for index %s\n", index->name + 1); +corruption: + error = DB_CORRUPTION; + goto func_exit; + } + + if (index->online_log->head.blocks + == index->online_log->tail.blocks) { + if (index->online_log->head.blocks) { +#ifdef HAVE_FTRUNCATE + /* Truncate the file in order to save space. */ + if (ftruncate(index->online_log->fd, 0) == -1) { + perror("ftruncate"); + } +#endif /* HAVE_FTRUNCATE */ + index->online_log->head.blocks + = index->online_log->tail.blocks = 0; + } + + next_mrec = index->online_log->tail.block; + next_mrec_end = next_mrec + index->online_log->tail.bytes; + + if (next_mrec_end == next_mrec) { + /* End of log reached. */ +all_done: + ut_ad(has_index_lock); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + error = DB_SUCCESS; + goto func_exit; + } + } else { + os_offset_t ofs; + ibool success; + + ofs = (os_offset_t) index->online_log->head.blocks + * srv_sort_buf_size; + + ut_ad(has_index_lock); + has_index_lock = false; + rw_lock_x_unlock(dict_index_get_lock(index)); + + log_free_check(); + + if (!row_log_block_allocate(index->online_log->head)) { + error = DB_OUT_OF_MEMORY; + goto func_exit; + } + + success = os_file_read_no_error_handling( + OS_FILE_FROM_FD(index->online_log->fd), + index->online_log->head.block, ofs, + srv_sort_buf_size); + + if (!success) { + fprintf(stderr, "InnoDB: unable to read temporary file" + " for index %s\n", index->name + 1); + goto corruption; + } + +#ifdef POSIX_FADV_DONTNEED + /* Each block is read exactly once. Free up the file cache. */ + posix_fadvise(index->online_log->fd, + ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); +#endif /* POSIX_FADV_DONTNEED */ +#if 0 //def FALLOC_FL_PUNCH_HOLE + /* Try to deallocate the space for the file on disk. + This should work on ext4 on Linux 2.6.39 and later, + and be ignored when the operation is unsupported. */ + fallocate(index->online_log->fd, + FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, + ofs, srv_buf_size); +#endif /* FALLOC_FL_PUNCH_HOLE */ + + next_mrec = index->online_log->head.block; + next_mrec_end = next_mrec + srv_sort_buf_size; + } + + if (mrec) { + /* A partial record was read from the previous block. + Copy the temporary buffer full, as we do not know the + length of the record. Parse subsequent records from + the bigger buffer index->online_log->head.block + or index->online_log->tail.block. */ + + ut_ad(mrec == index->online_log->head.buf); + ut_ad(mrec_end > mrec); + ut_ad(mrec_end < (&index->online_log->head.buf)[1]); + + memcpy((mrec_t*) mrec_end, next_mrec, + (&index->online_log->head.buf)[1] - mrec_end); + mrec = row_log_apply_op( + index, dup, &error, offsets_heap, heap, + has_index_lock, index->online_log->head.buf, + (&index->online_log->head.buf)[1], offsets); + if (error != DB_SUCCESS) { + goto func_exit; + } else if (UNIV_UNLIKELY(mrec == NULL)) { + /* The record was not reassembled properly. */ + goto corruption; + } + /* The record was previously found out to be + truncated. Now that the parse buffer was extended, + it should proceed beyond the old end of the buffer. */ + ut_a(mrec > mrec_end); + + index->online_log->head.bytes = mrec - mrec_end; + next_mrec += index->online_log->head.bytes; + } + + ut_ad(next_mrec <= next_mrec_end); + /* The following loop must not be parsing the temporary + buffer, but head.block or tail.block. */ + + /* mrec!=NULL means that the next record starts from the + middle of the block */ + ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0)); + +#ifdef UNIV_DEBUG + if (next_mrec_end == index->online_log->head.block + + srv_sort_buf_size) { + /* If tail.bytes == 0, next_mrec_end can also be at + the end of tail.block. */ + if (index->online_log->tail.bytes == 0) { + ut_ad(next_mrec == next_mrec_end); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes == 0); + } else { + ut_ad(next_mrec == index->online_log->head.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks + > index->online_log->head.blocks); + } + } else if (next_mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes) { + ut_ad(next_mrec == index->online_log->tail.block + + index->online_log->head.bytes); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->head.bytes + <= index->online_log->tail.bytes); + } else { + ut_error; + } +#endif /* UNIV_DEBUG */ + + mrec_end = next_mrec_end; + + while (!trx_is_interrupted(trx)) { + mrec = next_mrec; + ut_ad(mrec < mrec_end); + + if (!has_index_lock) { + /* We are applying operations from a different + block than the one that is being written to. + We do not hold index->lock in order to + allow other threads to concurrently buffer + modifications. */ + ut_ad(mrec >= index->online_log->head.block); + ut_ad(mrec_end == index->online_log->head.block + + srv_sort_buf_size); + ut_ad(index->online_log->head.bytes + < srv_sort_buf_size); + + /* Take the opportunity to do a redo log + checkpoint if needed. */ + log_free_check(); + } else { + /* We are applying operations from the last block. + Do not allow other threads to buffer anything, + so that we can finally catch up and synchronize. */ + ut_ad(index->online_log->head.blocks == 0); + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(mrec >= index->online_log->tail.block); + } + + next_mrec = row_log_apply_op( + index, dup, &error, offsets_heap, heap, + has_index_lock, mrec, mrec_end, offsets); + + if (error != DB_SUCCESS) { + goto func_exit; + } else if (next_mrec == next_mrec_end) { + /* The record happened to end on a block boundary. + Do we have more blocks left? */ + if (has_index_lock) { + /* The index will be locked while + applying the last block. */ + goto all_done; + } + + mrec = NULL; +process_next_block: + rw_lock_x_lock(dict_index_get_lock(index)); + has_index_lock = true; + + index->online_log->head.bytes = 0; + index->online_log->head.blocks++; + goto next_block; + } else if (next_mrec != NULL) { + ut_ad(next_mrec < next_mrec_end); + index->online_log->head.bytes += next_mrec - mrec; + } else if (has_index_lock) { + /* When mrec is within tail.block, it should + be a complete record, because we are holding + index->lock and thus excluding the writer. */ + ut_ad(index->online_log->tail.blocks == 0); + ut_ad(mrec_end == index->online_log->tail.block + + index->online_log->tail.bytes); + ut_ad(0); + goto unexpected_eof; + } else { + memcpy(index->online_log->head.buf, mrec, + mrec_end - mrec); + mrec_end += index->online_log->head.buf - mrec; + mrec = index->online_log->head.buf; + goto process_next_block; + } + } + +interrupted: + error = DB_INTERRUPTED; +func_exit: + if (!has_index_lock) { + rw_lock_x_lock(dict_index_get_lock(index)); + } + + switch (error) { + case DB_SUCCESS: + break; + case DB_INDEX_CORRUPT: + if (((os_offset_t) index->online_log->tail.blocks + 1) + * srv_sort_buf_size >= srv_online_max_size) { + /* The log file grew too big. */ + error = DB_ONLINE_LOG_TOO_BIG; + } + /* fall through */ + default: + /* We set the flag directly instead of invoking + dict_set_corrupted_index_cache_only(index) here, + because the index is not "public" yet. */ + index->type |= DICT_CORRUPT; + } + + mem_heap_free(heap); + mem_heap_free(offsets_heap); + row_log_block_free(index->online_log->head); + ut_free(offsets); + return(error); +} + +/******************************************************//** +Apply the row log to the index upon completing index creation. +@return DB_SUCCESS, or error code on failure */ +UNIV_INTERN +dberr_t +row_log_apply( +/*==========*/ + trx_t* trx, /*!< in: transaction (for checking if + the operation was interrupted) */ + dict_index_t* index, /*!< in/out: secondary index */ + struct TABLE* table) /*!< in/out: MySQL table + (for reporting duplicates) */ +{ + dberr_t error; + row_log_t* log; + row_merge_dup_t dup = { index, table, NULL, 0 }; + DBUG_ENTER("row_log_apply"); + + ut_ad(dict_index_is_online_ddl(index)); + ut_ad(!dict_index_is_clust(index)); + + log_free_check(); + + rw_lock_x_lock(dict_index_get_lock(index)); + + if (!dict_table_is_corrupted(index->table)) { + error = row_log_apply_ops(trx, index, &dup); + } else { + error = DB_SUCCESS; + } + + if (error != DB_SUCCESS) { + ut_a(!dict_table_is_discarded(index->table)); + /* We set the flag directly instead of invoking + dict_set_corrupted_index_cache_only(index) here, + because the index is not "public" yet. */ + index->type |= DICT_CORRUPT; + index->table->drop_aborted = TRUE; + + dict_index_set_online_status(index, ONLINE_INDEX_ABORTED); + } else { + ut_ad(dup.n_dup == 0); + dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE); + } + + log = index->online_log; + index->online_log = NULL; + /* We could remove the TEMP_INDEX_PREFIX and update the data + dictionary to say that this index is complete, if we had + access to the .frm file here. If the server crashes before + all requested indexes have been created, this completed index + will be dropped. */ + rw_lock_x_unlock(dict_index_get_lock(index)); + + row_log_free(log); + + DBUG_RETURN(error); +} |