summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0log.cc')
-rw-r--r--storage/innobase/row/row0log.cc529
1 files changed, 355 insertions, 174 deletions
diff --git a/storage/innobase/row/row0log.cc b/storage/innobase/row/row0log.cc
index 01270300924..170358147b1 100644
--- a/storage/innobase/row/row0log.cc
+++ b/storage/innobase/row/row0log.cc
@@ -1,6 +1,6 @@
/*****************************************************************************
-Copyright (c) 2011, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2011, 2013, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -38,7 +38,7 @@ Created 2011-05-26 Marko Makela
#include "que0que.h"
#include "handler0alter.h"
-#include<set>
+#include<map>
/** Table row modification operations during online table rebuild.
Delete-marked records are not copied to the rebuilt table. */
@@ -72,18 +72,86 @@ static bool row_log_apply_print;
/** Size of the modification log entry header, in bytes */
#define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/
-/** Log block for modifications during online index creation */
+/** Log block for modifications during online ALTER TABLE */
struct row_log_buf_t {
byte* block; /*!< file block buffer */
mrec_buf_t buf; /*!< buffer for accessing a record
that spans two blocks */
ulint blocks; /*!< current position in blocks */
ulint bytes; /*!< current position within buf */
+ ulonglong total; /*!< logical position, in bytes from
+ the start of the row_log_table log;
+ 0 for row_log_online_op() and
+ row_log_apply(). */
};
-/** Set of transactions that rolled back inserts of BLOBs during
-online table rebuild */
-typedef std::set<trx_id_t> trx_id_set;
+/** Tracks BLOB allocation during online ALTER TABLE */
+class row_log_table_blob_t {
+public:
+ /** Constructor (declaring a BLOB freed)
+ @param offset_arg row_log_t::tail::total */
+#ifdef UNIV_DEBUG
+ row_log_table_blob_t(ulonglong offset_arg) :
+ old_offset (0), free_offset (offset_arg),
+ offset (BLOB_FREED) {}
+#else /* UNIV_DEBUG */
+ row_log_table_blob_t() :
+ offset (BLOB_FREED) {}
+#endif /* UNIV_DEBUG */
+
+ /** Declare a BLOB freed again.
+ @param offset_arg row_log_t::tail::total */
+#ifdef UNIV_DEBUG
+ void blob_free(ulonglong offset_arg)
+#else /* UNIV_DEBUG */
+ void blob_free()
+#endif /* UNIV_DEBUG */
+ {
+ ut_ad(offset < offset_arg);
+ ut_ad(offset != BLOB_FREED);
+ ut_d(old_offset = offset);
+ ut_d(free_offset = offset_arg);
+ offset = BLOB_FREED;
+ }
+ /** Declare a freed BLOB reused.
+ @param offset_arg row_log_t::tail::total */
+ void blob_alloc(ulonglong offset_arg) {
+ ut_ad(free_offset <= offset_arg);
+ ut_d(old_offset = offset);
+ offset = offset_arg;
+ }
+ /** Determine if a BLOB was freed at a given log position
+ @param offset_arg row_log_t::head::total after the log record
+ @return true if freed */
+ bool is_freed(ulonglong offset_arg) const {
+ /* This is supposed to be the offset at the end of the
+ current log record. */
+ ut_ad(offset_arg > 0);
+ /* We should never get anywhere close the magic value. */
+ ut_ad(offset_arg < BLOB_FREED);
+ return(offset_arg < offset);
+ }
+private:
+ /** Magic value for a freed BLOB */
+ static const ulonglong BLOB_FREED = ~0ULL;
+#ifdef UNIV_DEBUG
+ /** Old offset, in case a page was freed, reused, freed, ... */
+ ulonglong old_offset;
+ /** Offset of last blob_free() */
+ ulonglong free_offset;
+#endif /* UNIV_DEBUG */
+ /** Byte offset to the log file */
+ ulonglong offset;
+};
+
+/** @brief Map of off-page column page numbers to 0 or log byte offsets.
+
+If there is no mapping for a page number, it is safe to access.
+If a page number maps to 0, it is an off-page column that has been freed.
+If a page number maps to a nonzero number, the number is a byte offset
+into the index->online_log, indicating that the page is safe to access
+when applying log records starting from that offset. */
+typedef std::map<ulint, row_log_table_blob_t> page_no_map;
/** @brief Buffer for logging modifications during online index creation
@@ -99,11 +167,12 @@ directly. When also head.bytes == tail.bytes, both counts will be
reset to 0 and the file will be truncated. */
struct row_log_t {
int fd; /*!< file descriptor */
- ib_mutex_t mutex; /*!< mutex protecting trx_log, error,
+ ib_mutex_t mutex; /*!< mutex protecting error,
max_trx and tail */
- trx_id_set* trx_rb; /*!< set of transactions that rolled back
- inserts of BLOBs during online table rebuild;
- protected by mutex */
+ page_no_map* blobs; /*!< map of page numbers of off-page columns
+ that have been freed during table-rebuilding
+ ALTER TABLE (row_log_table_*); protected by
+ index->lock X-latch only */
dict_table_t* table; /*!< table that is being rebuilt,
or NULL when this is a secondary
index that is being created online */
@@ -347,6 +416,7 @@ write_failed:
ut_ad(b == log->tail.block + log->tail.bytes);
}
+ log->tail.total += size;
UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
mutex_exit(&log->mutex);
}
@@ -371,6 +441,7 @@ row_log_table_delete(
dict_index_t* index, /*!< in/out: clustered index, S-latched
or X-latched */
const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
+ bool purge, /*!< in: true=purging BLOBs */
trx_id_t trx_id) /*!< in: DB_TRX_ID of the record before
it was deleted */
{
@@ -460,6 +531,7 @@ row_log_table_delete(
ut_ad(dfield_get_type(dfield)->prtype
== (DATA_NOT_NULL | DATA_TRX_ID));
ut_ad(dfield_get_len(dfield) == DATA_TRX_ID_LEN);
+ dfield_dup(dfield, heap);
trx_write_trx_id(static_cast<byte*>(dfield->data), trx_id);
}
@@ -473,27 +545,25 @@ row_log_table_delete(
mrec_size = 4 + old_pk_size;
- /* If the row is marked as rollback, we will need to
- log the enough prefix of the BLOB unless both the
- old and new table are in COMPACT or REDUNDANT format */
- if ((dict_table_get_format(index->table) >= UNIV_FORMAT_B
- || dict_table_get_format(new_table) >= UNIV_FORMAT_B)
- && row_log_table_is_rollback(index, trx_id)) {
- if (rec_offs_any_extern(offsets)) {
- /* Build a cache of those off-page column
- prefixes that are referenced by secondary
- indexes. It can be that none of the off-page
- columns are needed. */
- row_build(ROW_COPY_DATA, index, rec,
- offsets, NULL, NULL, NULL, &ext, heap);
- if (ext) {
- /* Log the row_ext_t, ext->ext and ext->buf */
- ext_size = ext->n_ext * ext->max_len
- + sizeof(*ext)
- + ext->n_ext * sizeof(ulint)
- + (ext->n_ext - 1) * sizeof ext->len;
- mrec_size += ext_size;
- }
+ /* Log enough prefix of the BLOB unless both the
+ old and new table are in COMPACT or REDUNDANT format,
+ which store the prefix in the clustered index record. */
+ if (purge && rec_offs_any_extern(offsets)
+ && (dict_table_get_format(index->table) >= UNIV_FORMAT_B
+ || dict_table_get_format(new_table) >= UNIV_FORMAT_B)) {
+
+ /* Build a cache of those off-page column prefixes
+ that are referenced by secondary indexes. It can be
+ that none of the off-page columns are needed. */
+ row_build(ROW_COPY_DATA, index, rec,
+ offsets, NULL, NULL, NULL, &ext, heap);
+ if (ext) {
+ /* Log the row_ext_t, ext->ext and ext->buf */
+ ext_size = ext->n_ext * ext->max_len
+ + sizeof(*ext)
+ + ext->n_ext * sizeof(ulint)
+ + (ext->n_ext - 1) * sizeof ext->len;
+ mrec_size += ext_size;
}
}
@@ -548,7 +618,7 @@ row_log_table_delete(
/******************************************************//**
Logs an insert or update to a table that is being rebuilt. */
-static __attribute__((nonnull(1,2,3)))
+static
void
row_log_table_low_redundant(
/*========================*/
@@ -557,7 +627,6 @@ row_log_table_low_redundant(
page X-latched */
dict_index_t* index, /*!< in/out: clustered index, S-latched
or X-latched */
- const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
bool insert, /*!< in: true if insert,
false if update */
const dtuple_t* old_pk, /*!< in: old PRIMARY KEY value
@@ -578,6 +647,9 @@ row_log_table_low_redundant(
ut_ad(!page_is_comp(page_align(rec)));
ut_ad(dict_index_get_n_fields(index) == rec_get_n_fields_old(rec));
+ ut_ad(dict_tf_is_valid(index->table->flags));
+ ut_ad(!dict_table_is_comp(index->table)); /* redundant row format */
+ ut_ad(dict_index_is_clust(new_index));
heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields));
tuple = dtuple_create(heap, index->n_fields);
@@ -712,7 +784,7 @@ row_log_table_low(
if (!rec_offs_comp(offsets)) {
row_log_table_low_redundant(
- rec, index, offsets, insert, old_pk, new_index);
+ rec, index, insert, old_pk, new_index);
return;
}
@@ -723,8 +795,8 @@ row_log_table_low(
extra_size = rec_offs_extra_size(offsets) - omit_size;
- mrec_size = rec_offs_size(offsets) - omit_size
- + ROW_LOG_HEADER_SIZE + (extra_size >= 0x80);
+ mrec_size = ROW_LOG_HEADER_SIZE
+ + (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size;
if (insert || index->online_log->same_pk) {
ut_ad(!old_pk);
@@ -793,6 +865,93 @@ row_log_table_update(
row_log_table_low(rec, index, offsets, false, old_pk);
}
+/** Gets the old table column of a PRIMARY KEY column.
+@param table old table (before ALTER TABLE)
+@param col_map mapping of old column numbers to new ones
+@param col_no column position in the new table
+@return old table column, or NULL if this is an added column */
+static
+const dict_col_t*
+row_log_table_get_pk_old_col(
+/*=========================*/
+ const dict_table_t* table,
+ const ulint* col_map,
+ ulint col_no)
+{
+ for (ulint i = 0; i < table->n_cols; i++) {
+ if (col_no == col_map[i]) {
+ return(dict_table_get_nth_col(table, i));
+ }
+ }
+
+ return(NULL);
+}
+
+/** Maps an old table column of a PRIMARY KEY column.
+@param col old table column (before ALTER TABLE)
+@param ifield clustered index field in the new table (after ALTER TABLE)
+@param dfield clustered index tuple field in the new table
+@param heap memory heap for allocating dfield contents
+@param rec clustered index leaf page record in the old table
+@param offsets rec_get_offsets(rec)
+@param i rec field corresponding to col
+@param zip_size compressed page size of the old table, or 0 for uncompressed
+@param max_len maximum length of dfield
+@retval DB_INVALID_NULL if a NULL value is encountered
+@retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */
+static
+dberr_t
+row_log_table_get_pk_col(
+/*=====================*/
+ const dict_col_t* col,
+ const dict_field_t* ifield,
+ dfield_t* dfield,
+ mem_heap_t* heap,
+ const rec_t* rec,
+ const ulint* offsets,
+ ulint i,
+ ulint zip_size,
+ ulint max_len)
+{
+ const byte* field;
+ ulint len;
+
+ ut_ad(ut_is_2pow(zip_size));
+
+ field = rec_get_nth_field(rec, offsets, i, &len);
+
+ if (len == UNIV_SQL_NULL) {
+ return(DB_INVALID_NULL);
+ }
+
+ if (rec_offs_nth_extern(offsets, i)) {
+ ulint field_len = ifield->prefix_len;
+ byte* blob_field;
+
+ if (!field_len) {
+ field_len = ifield->fixed_len;
+ if (!field_len) {
+ field_len = max_len + 1;
+ }
+ }
+
+ blob_field = static_cast<byte*>(
+ mem_heap_alloc(heap, field_len));
+
+ len = btr_copy_externally_stored_field_prefix(
+ blob_field, field_len, zip_size, field, len);
+ if (len >= max_len + 1) {
+ return(DB_TOO_BIG_INDEX_COL);
+ }
+
+ dfield_set_data(dfield, blob_field, len);
+ } else {
+ dfield_set_data(dfield, mem_heap_dup(heap, field, len), len);
+ }
+
+ return(DB_SUCCESS);
+}
+
/******************************************************//**
Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
of a table that is being rebuilt.
@@ -865,95 +1024,69 @@ row_log_table_get_pk(
dict_index_copy_types(tuple, new_index, tuple->n_fields);
dtuple_set_n_fields_cmp(tuple, new_n_uniq);
+ const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table);
+ const ulint zip_size = dict_table_zip_size(index->table);
+
for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
- dict_field_t* ifield;
- dfield_t* dfield;
- const dict_col_t* new_col;
- const dict_col_t* col;
- ulint col_no;
- ulint i;
- ulint len;
- const byte* field;
+ dict_field_t* ifield;
+ dfield_t* dfield;
+ ulint prtype;
+ ulint mbminmaxlen;
ifield = dict_index_get_nth_field(new_index, new_i);
dfield = dtuple_get_nth_field(tuple, new_i);
- new_col = dict_field_get_col(ifield);
- col_no = new_col->ind;
-
- for (ulint old_i = 0; old_i < index->table->n_cols;
- old_i++) {
- if (col_no == log->col_map[old_i]) {
- col_no = old_i;
- goto copy_col;
- }
- }
-
- /* No matching column was found in the old
- table, so this must be an added column.
- Copy the default value. */
- ut_ad(log->add_cols);
- dfield_copy(dfield,
- dtuple_get_nth_field(
- log->add_cols, col_no));
- continue;
-
-copy_col:
- col = dict_table_get_nth_col(index->table, col_no);
- i = dict_col_get_clust_pos(col, index);
+ const ulint col_no
+ = dict_field_get_col(ifield)->ind;
- if (i == ULINT_UNDEFINED) {
- ut_ad(0);
- log->error = DB_CORRUPTION;
- tuple = NULL;
- goto func_exit;
- }
+ if (const dict_col_t* col
+ = row_log_table_get_pk_old_col(
+ index->table, log->col_map, col_no)) {
+ ulint i = dict_col_get_clust_pos(col, index);
- field = rec_get_nth_field(rec, offsets, i, &len);
-
- if (len == UNIV_SQL_NULL) {
- log->error = DB_INVALID_NULL;
- tuple = NULL;
- goto func_exit;
- }
-
- if (rec_offs_nth_extern(offsets, i)) {
- ulint field_len = ifield->prefix_len;
- byte* blob_field;
- const ulint max_len =
- DICT_MAX_FIELD_LEN_BY_FORMAT(
- new_table);
-
- if (!field_len) {
- field_len = ifield->fixed_len;
- if (!field_len) {
- field_len = max_len + 1;
- }
+ if (i == ULINT_UNDEFINED) {
+ ut_ad(0);
+ log->error = DB_CORRUPTION;
+ goto err_exit;
}
- blob_field = static_cast<byte*>(
- mem_heap_alloc(*heap, field_len));
+ log->error = row_log_table_get_pk_col(
+ col, ifield, dfield, *heap,
+ rec, offsets, i, zip_size, max_len);
- len = btr_copy_externally_stored_field_prefix(
- blob_field, field_len,
- dict_table_zip_size(index->table),
- field, len);
- if (len == max_len + 1) {
- log->error = DB_TOO_BIG_INDEX_COL;
+ if (log->error != DB_SUCCESS) {
+err_exit:
tuple = NULL;
goto func_exit;
}
- dfield_set_data(dfield, blob_field, len);
+ mbminmaxlen = col->mbminmaxlen;
+ prtype = col->prtype;
} else {
- if (ifield->prefix_len
- && ifield->prefix_len < len) {
- len = ifield->prefix_len;
- }
+ /* No matching column was found in the old
+ table, so this must be an added column.
+ Copy the default value. */
+ ut_ad(log->add_cols);
+
+ dfield_copy(dfield, dtuple_get_nth_field(
+ log->add_cols, col_no));
+ mbminmaxlen = dfield->type.mbminmaxlen;
+ prtype = dfield->type.prtype;
+ }
+
+ ut_ad(!dfield_is_ext(dfield));
+ ut_ad(!dfield_is_null(dfield));
- dfield_set_data(
- dfield,
- mem_heap_dup(*heap, field, len), len);
+ if (ifield->prefix_len) {
+ ulint len = dtype_get_at_most_n_mbchars(
+ prtype, mbminmaxlen,
+ ifield->prefix_len,
+ dfield_get_len(dfield),
+ static_cast<const char*>(
+ dfield_get_data(dfield)));
+
+ ut_ad(len <= dfield_get_len(dfield));
+ dfield_set_len(dfield, len);
}
}
@@ -988,66 +1121,80 @@ row_log_table_insert(
}
/******************************************************//**
-Notes that a transaction is being rolled back. */
+Notes that a BLOB is being freed during online ALTER TABLE. */
UNIV_INTERN
void
-row_log_table_rollback(
-/*===================*/
- dict_index_t* index, /*!< in/out: clustered index */
- trx_id_t trx_id) /*!< in: transaction being rolled back */
+row_log_table_blob_free(
+/*====================*/
+ dict_index_t* index, /*!< in/out: clustered index, X-latched */
+ ulint page_no)/*!< in: starting page number of the BLOB */
{
ut_ad(dict_index_is_clust(index));
-#ifdef UNIV_DEBUG
- ibool corrupt = FALSE;
- ut_ad(trx_rw_is_active(trx_id, &corrupt));
- ut_ad(!corrupt);
-#endif /* UNIV_DEBUG */
+ ut_ad(dict_index_is_online_ddl(index));
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(page_no != FIL_NULL);
- /* Protect transitions of index->online_status and access to
- index->online_log. */
- rw_lock_s_lock(&index->lock);
+ if (index->online_log->error != DB_SUCCESS) {
+ return;
+ }
- if (dict_index_is_online_ddl(index)) {
- ut_ad(index->online_log);
- ut_ad(index->online_log->table);
- mutex_enter(&index->online_log->mutex);
- trx_id_set* trxs = index->online_log->trx_rb;
+ page_no_map* blobs = index->online_log->blobs;
- if (!trxs) {
- index->online_log->trx_rb = trxs = new trx_id_set();
- }
+ if (!blobs) {
+ index->online_log->blobs = blobs = new page_no_map();
+ }
- trxs->insert(trx_id);
+#ifdef UNIV_DEBUG
+ const ulonglong log_pos = index->online_log->tail.total;
+#else
+# define log_pos /* empty */
+#endif /* UNIV_DEBUG */
- mutex_exit(&index->online_log->mutex);
- }
+ const page_no_map::value_type v(page_no,
+ row_log_table_blob_t(log_pos));
- rw_lock_s_unlock(&index->lock);
+ std::pair<page_no_map::iterator,bool> p = blobs->insert(v);
+
+ if (!p.second) {
+ /* Update the existing mapping. */
+ ut_ad(p.first->first == page_no);
+ p.first->second.blob_free(log_pos);
+ }
+#undef log_pos
}
/******************************************************//**
-Check if a transaction rollback has been initiated.
-@return true if inserts of this transaction were rolled back */
+Notes that a BLOB is being allocated during online ALTER TABLE. */
UNIV_INTERN
-bool
-row_log_table_is_rollback(
-/*======================*/
- const dict_index_t* index, /*!< in: clustered index */
- trx_id_t trx_id) /*!< in: transaction id */
+void
+row_log_table_blob_alloc(
+/*=====================*/
+ dict_index_t* index, /*!< in/out: clustered index, X-latched */
+ ulint page_no)/*!< in: starting page number of the BLOB */
{
ut_ad(dict_index_is_clust(index));
ut_ad(dict_index_is_online_ddl(index));
- ut_ad(index->online_log);
-
- if (const trx_id_set* trxs = index->online_log->trx_rb) {
- mutex_enter(&index->online_log->mutex);
- bool is_rollback = trxs->find(trx_id) != trxs->end();
- mutex_exit(&index->online_log->mutex);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(page_no != FIL_NULL);
- return(is_rollback);
+ if (index->online_log->error != DB_SUCCESS) {
+ return;
}
- return(false);
+ /* Only track allocations if the same page has been freed
+ earlier. Double allocation without a free is not allowed. */
+ if (page_no_map* blobs = index->online_log->blobs) {
+ page_no_map::iterator p = blobs->find(page_no);
+
+ if (p != blobs->end()) {
+ ut_ad(p->first == page_no);
+ p->second.blob_alloc(index->online_log->tail.total);
+ }
+ }
}
/******************************************************//**
@@ -1069,17 +1216,6 @@ row_log_table_apply_convert_mrec(
{
dtuple_t* row;
-#ifdef UNIV_SYNC_DEBUG
- /* This prevents BLOBs from being freed, in case an insert
- transaction rollback starts after row_log_table_is_rollback(). */
- ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
-#endif /* UNIV_SYNC_DEBUG */
-
- if (row_log_table_is_rollback(index, trx_id)) {
- row = NULL;
- goto func_exit;
- }
-
/* This is based on row_build(). */
if (log->add_cols) {
row = dtuple_copy(log->add_cols, heap);
@@ -1121,15 +1257,43 @@ row_log_table_apply_convert_mrec(
dfield_t* dfield
= dtuple_get_nth_field(row, col_no);
ulint len;
- const void* data;
+ const byte* data= NULL;
if (rec_offs_nth_extern(offsets, i)) {
ut_ad(rec_offs_any_extern(offsets));
- data = btr_rec_copy_externally_stored_field(
- mrec, offsets,
- dict_table_zip_size(index->table),
- i, &len, heap);
- ut_a(data);
+ rw_lock_x_lock(dict_index_get_lock(index));
+
+ if (const page_no_map* blobs = log->blobs) {
+ data = rec_get_nth_field(
+ mrec, offsets, i, &len);
+ ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
+
+ ulint page_no = mach_read_from_4(
+ data + len - (BTR_EXTERN_FIELD_REF_SIZE
+ - BTR_EXTERN_PAGE_NO));
+ page_no_map::const_iterator p = blobs->find(
+ page_no);
+ if (p != blobs->end()
+ && p->second.is_freed(log->head.total)) {
+ /* This BLOB has been freed.
+ We must not access the row. */
+ row = NULL;
+ }
+ }
+
+ if (row) {
+ data = btr_rec_copy_externally_stored_field(
+ mrec, offsets,
+ dict_table_zip_size(index->table),
+ i, &len, heap);
+ ut_a(data);
+ }
+
+ rw_lock_x_unlock(dict_index_get_lock(index));
+
+ if (!row) {
+ goto func_exit;
+ }
} else {
data = rec_get_nth_field(mrec, offsets, i, &len);
}
@@ -1685,7 +1849,7 @@ delete_insert:
| BTR_KEEP_POS_FLAG,
btr_pcur_get_btr_cur(&pcur),
&cur_offsets, &offsets_heap, heap, &big_rec,
- update, 0, NULL, 0, &mtr);
+ update, 0, thr, 0, &mtr);
if (big_rec) {
if (error == DB_SUCCESS) {
@@ -1783,7 +1947,7 @@ row_log_table_apply_op(
ulint* offsets) /*!< in/out: work area
for parsing mrec */
{
- const row_log_t*log = dup->index->online_log;
+ row_log_t* log = dup->index->online_log;
dict_index_t* new_index = dict_table_get_first_index(log->table);
ulint extra_size;
const mrec_t* next_mrec;
@@ -1793,6 +1957,7 @@ row_log_table_apply_op(
ut_ad(dict_index_is_clust(dup->index));
ut_ad(dup->index->table != log->table);
+ ut_ad(log->head.total <= log->tail.total);
*error = DB_SUCCESS;
@@ -1801,6 +1966,8 @@ row_log_table_apply_op(
return(NULL);
}
+ const mrec_t* const mrec_start = mrec;
+
switch (*mrec++) {
default:
ut_ad(0);
@@ -1830,6 +1997,8 @@ row_log_table_apply_op(
if (next_mrec > mrec_end) {
return(NULL);
} else {
+ log->head.total += next_mrec - mrec_start;
+
ulint len;
const byte* db_trx_id
= rec_get_nth_field(
@@ -1863,6 +2032,8 @@ row_log_table_apply_op(
return(NULL);
}
+ log->head.total += next_mrec - mrec_start;
+
/* If there are external fields, retrieve those logged
prefix info and reconstruct the row_ext_t */
if (ext_size) {
@@ -2019,6 +2190,7 @@ row_log_table_apply_op(
}
ut_ad(next_mrec <= mrec_end);
+ log->head.total += next_mrec - mrec_start;
dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq);
{
@@ -2036,6 +2208,7 @@ row_log_table_apply_op(
break;
}
+ ut_ad(log->head.total <= log->tail.total);
mem_heap_empty(offsets_heap);
mem_heap_empty(heap);
return(next_mrec);
@@ -2423,6 +2596,10 @@ row_log_table_apply(
};
error = row_log_table_apply_ops(thr, &dup);
+
+ ut_ad(error != DB_SUCCESS
+ || clust_index->online_log->head.total
+ == clust_index->online_log->tail.total);
}
rw_lock_x_unlock(dict_index_get_lock(clust_index));
@@ -2451,6 +2628,7 @@ row_log_allocate(
byte* buf;
row_log_t* log;
ulint size;
+ DBUG_ENTER("row_log_allocate");
ut_ad(!dict_index_is_online_ddl(index));
ut_ad(dict_index_is_clust(index) == !!table);
@@ -2464,7 +2642,7 @@ row_log_allocate(
size = 2 * srv_sort_buf_size + sizeof *log;
buf = (byte*) os_mem_alloc_large(&size);
if (!buf) {
- return(false);
+ DBUG_RETURN(false);
}
log = (row_log_t*) &buf[2 * srv_sort_buf_size];
@@ -2472,11 +2650,11 @@ row_log_allocate(
log->fd = row_merge_file_create_low();
if (log->fd < 0) {
os_mem_free_large(buf, size);
- return(false);
+ DBUG_RETURN(false);
}
mutex_create(index_online_log_key, &log->mutex,
SYNC_INDEX_ONLINE_LOG);
- log->trx_rb = NULL;
+ log->blobs = NULL;
log->table = table;
log->same_pk = same_pk;
log->add_cols = add_cols;
@@ -2486,7 +2664,9 @@ row_log_allocate(
log->head.block = buf;
log->tail.block = buf + srv_sort_buf_size;
log->tail.blocks = log->tail.bytes = 0;
+ log->tail.total = 0;
log->head.blocks = log->head.bytes = 0;
+ log->head.total = 0;
dict_index_set_online_status(index, ONLINE_INDEX_CREATION);
index->online_log = log;
@@ -2495,7 +2675,7 @@ row_log_allocate(
atomic operations in both cases. */
MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX);
- return(true);
+ DBUG_RETURN(true);
}
/******************************************************//**
@@ -2508,7 +2688,7 @@ row_log_free(
{
MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
- delete log->trx_rb;
+ delete log->blobs;
row_merge_file_destroy_low(log->fd);
mutex_free(&log->mutex);
os_mem_free_large(log->head.block, log->size);
@@ -3183,6 +3363,7 @@ row_log_apply(
dberr_t error;
row_log_t* log;
row_merge_dup_t dup = { index, table, NULL, 0 };
+ DBUG_ENTER("row_log_apply");
ut_ad(dict_index_is_online_ddl(index));
ut_ad(!dict_index_is_clust(index));
@@ -3225,5 +3406,5 @@ row_log_apply(
row_log_free(log);
- return(error);
+ DBUG_RETURN(error);
}