summaryrefslogtreecommitdiff
path: root/storage/innobase/page/page0cur.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/page/page0cur.cc')
-rw-r--r--storage/innobase/page/page0cur.cc667
1 files changed, 306 insertions, 361 deletions
diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc
index ab3f6e11bb8..5fa6df85558 100644
--- a/storage/innobase/page/page0cur.cc
+++ b/storage/innobase/page/page0cur.cc
@@ -784,188 +784,6 @@ page_cur_open_on_rnd_user_rec(
ut_rnd_interval(n_recs) + 1);
}
-/** Write a redo log record of inserting a record into an index page.
-@param[in] insert_rec inserted record
-@param[in] rec_size rec_get_size(insert_rec)
-@param[in] cursor_rec predecessor of insert_rec
-@param[in,out] index index tree
-@param[in,out] mtr mini-transaction */
-void
-page_cur_insert_rec_write_log(
- const rec_t* insert_rec,
- ulint rec_size,
- const rec_t* cursor_rec,
- dict_index_t* index,
- mtr_t* mtr)
-{
- ulint cur_rec_size;
- ulint extra_size;
- ulint cur_extra_size;
- const byte* ins_ptr;
- const byte* log_end;
- ulint i;
-
- if (index->table->is_temporary()) {
- mtr->set_modified();
- ut_ad(mtr->get_log_mode() == MTR_LOG_NO_REDO);
- return;
- }
-
- ut_a(rec_size < srv_page_size);
- ut_ad(mtr->is_named_space(index->table->space));
- ut_ad(page_align(insert_rec) == page_align(cursor_rec));
- ut_ad(!page_rec_is_comp(insert_rec)
- == !dict_table_is_comp(index->table));
-
- const bool is_leaf = page_rec_is_leaf(cursor_rec);
-
- {
- mem_heap_t* heap = NULL;
- offset_t cur_offs_[REC_OFFS_NORMAL_SIZE];
- offset_t ins_offs_[REC_OFFS_NORMAL_SIZE];
-
- offset_t* cur_offs;
- offset_t* ins_offs;
-
- rec_offs_init(cur_offs_);
- rec_offs_init(ins_offs_);
-
- cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
- is_leaf, ULINT_UNDEFINED, &heap);
- ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
- is_leaf, ULINT_UNDEFINED, &heap);
-
- extra_size = rec_offs_extra_size(ins_offs);
- cur_extra_size = rec_offs_extra_size(cur_offs);
- ut_ad(rec_size == rec_offs_size(ins_offs));
- cur_rec_size = rec_offs_size(cur_offs);
-
- if (UNIV_LIKELY_NULL(heap)) {
- mem_heap_free(heap);
- }
- }
-
- ins_ptr = insert_rec - extra_size;
-
- i = 0;
-
- if (cur_extra_size == extra_size) {
- ulint min_rec_size = ut_min(cur_rec_size, rec_size);
-
- const byte* cur_ptr = cursor_rec - cur_extra_size;
-
- /* Find out the first byte in insert_rec which differs from
- cursor_rec; skip the bytes in the record info */
-
- do {
- if (*ins_ptr == *cur_ptr) {
- i++;
- ins_ptr++;
- cur_ptr++;
- } else if ((i < extra_size)
- && (i >= extra_size
- - page_rec_get_base_extra_size
- (insert_rec))) {
- i = extra_size;
- ins_ptr = insert_rec;
- cur_ptr = cursor_rec;
- } else {
- break;
- }
- } while (i < min_rec_size);
- }
-
- byte* log_ptr;
-
- if (page_rec_is_comp(insert_rec)) {
- log_ptr = mlog_open_and_write_index(
- mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
- 2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
- if (UNIV_UNLIKELY(!log_ptr)) {
- /* Logging in mtr is switched off
- during crash recovery: in that case
- mlog_open returns NULL */
- return;
- }
- } else {
- log_ptr = mlog_open(mtr, 11
- + 2 + 5 + 1 + 5 + 5
- + MLOG_BUF_MARGIN);
- if (UNIV_UNLIKELY(!log_ptr)) {
- /* Logging in mtr is switched off
- during crash recovery: in that case
- mlog_open returns NULL */
- return;
- }
-
- log_ptr = mlog_write_initial_log_record_fast(
- insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
- }
-
- log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
- /* Write the cursor rec offset as a 2-byte ulint */
- mach_write_to_2(log_ptr, page_offset(cursor_rec));
- log_ptr += 2;
-
- if (page_rec_is_comp(insert_rec)) {
- if (UNIV_UNLIKELY
- (rec_get_info_and_status_bits(insert_rec, TRUE)
- != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
-
- goto need_extra_info;
- }
- } else {
- if (UNIV_UNLIKELY
- (rec_get_info_and_status_bits(insert_rec, FALSE)
- != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
-
- goto need_extra_info;
- }
- }
-
- if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
-need_extra_info:
- /* Write the record end segment length
- and the extra info storage flag */
- log_ptr += mach_write_compressed(log_ptr,
- 2 * (rec_size - i) + 1);
-
- /* Write the info bits */
- mach_write_to_1(log_ptr,
- rec_get_info_and_status_bits(
- insert_rec,
- page_rec_is_comp(insert_rec)));
- log_ptr++;
-
- /* Write the record origin offset */
- log_ptr += mach_write_compressed(log_ptr, extra_size);
-
- /* Write the mismatch index */
- log_ptr += mach_write_compressed(log_ptr, i);
-
- ut_a(i < srv_page_size);
- ut_a(extra_size < srv_page_size);
- } else {
- /* Write the record end segment length
- and the extra info storage flag */
- log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
- }
-
- /* Write to the log the inserted index record end segment which
- differs from the cursor record */
-
- rec_size -= i;
-
- if (log_ptr + rec_size <= log_end) {
- memcpy(log_ptr, ins_ptr, rec_size);
- mlog_close(mtr, log_ptr + rec_size);
- } else {
- mlog_close(mtr, log_ptr);
- ut_a(rec_size < srv_page_size);
- mlog_catenate_string(mtr, ins_ptr, rec_size);
- }
-}
-
static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact)
{
rec_set_bit_field_2(rec, heap_no,
@@ -973,20 +791,10 @@ static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact)
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
}
-static void rec_set_heap_no(const buf_block_t& block, rec_t *rec,
- ulint heap_no, bool compact, mtr_t *mtr)
-{
- rec-= compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO;
-
- // MDEV-12353 FIXME: try single-byte write if possible
- mtr->write<2,mtr_t::OPT>(block, rec,
- (mach_read_from_2(rec) & ~REC_HEAP_NO_MASK) |
- (heap_no << REC_HEAP_NO_SHIFT));
-}
-
/***********************************************************//**
Parses a log record of a record insert on a page.
@return end of log record or NULL */
+ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_cur_parse_insert_rec(
/*======================*/
@@ -1140,19 +948,26 @@ page_cur_parse_insert_rec(
rec_set_info_and_status_bits(buf + origin_offset,
info_and_status_bits);
} else {
- rec_set_info_bits_old(buf + origin_offset,
- info_and_status_bits);
+ rec_set_bit_field_1(buf + origin_offset, info_and_status_bits,
+ REC_OLD_INFO_BITS,
+ REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT);
}
page_cur_position(cursor_rec, block, &cursor);
offsets = rec_get_offsets(buf + origin_offset, index, offsets,
is_leaf, ULINT_UNDEFINED, &heap);
- if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
- buf + origin_offset,
- index, offsets, mtr))) {
- /* The redo log record should only have been written
- after the write was successful. */
+ /* The redo log record should only have been written
+ after the write was successful. */
+ if (block->page.zip.data) {
+ if (!page_cur_insert_rec_zip(&cursor, index,
+ buf + origin_offset,
+ offsets, mtr)) {
+ ut_error;
+ }
+ } else if (!page_cur_insert_rec_low(&cursor, index,
+ buf + origin_offset,
+ offsets, mtr)) {
ut_error;
}
@@ -1169,47 +984,57 @@ page_cur_parse_insert_rec(
}
/** Reset PAGE_DIRECTION and PAGE_N_DIRECTION.
-@param[in,out] ptr the PAGE_DIRECTION_B field
-@param[in,out] page index tree page frame
-@param[in] page_zip compressed page descriptor, or NULL */
-static inline
-void
-page_direction_reset(byte* ptr, page_t* page, page_zip_des_t* page_zip)
+@tparam compressed whether the page is in ROW_FORMAT=COMPRESSED
+@param[in,out] block index page
+@param[in,out] ptr the PAGE_DIRECTION_B field
+@param[in,out] mtr mini-transaction */
+template<bool compressed=false>
+inline void page_direction_reset(buf_block_t *block, byte *ptr, mtr_t *mtr)
{
- ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
- page_ptr_set_direction(ptr, PAGE_NO_DIRECTION);
- if (page_zip) {
- page_zip_write_header(page_zip, ptr, 1, NULL);
- }
- ptr = PAGE_HEADER + PAGE_N_DIRECTION + page;
- *reinterpret_cast<uint16_t*>(ptr) = 0;
- if (page_zip) {
- page_zip_write_header(page_zip, ptr, 2, NULL);
- }
+ ut_ad(!block->page.zip.data || page_is_comp(block->frame));
+ ut_ad(!compressed || block->page.zip.data);
+ ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + block->frame);
+ static_assert(PAGE_DIRECTION_B + 1 == PAGE_N_DIRECTION, "adjacent fields");
+
+ if (compressed)
+ {
+ *ptr= PAGE_NO_DIRECTION; /* no instant ALTER bits */
+ memset_aligned<2>(ptr + 1, 0, 2);
+ page_zip_write_header(&block->page.zip, ptr, 3, mtr);
+ }
+ else
+ {
+ mtr->write<1,mtr_t::OPT>(*block, ptr, (*ptr & ~((1U << 3) - 1))
+ | PAGE_NO_DIRECTION);
+ mtr->write<2,mtr_t::OPT>(*block, ptr + 1, 0U);
+ }
}
/** Increment PAGE_N_DIRECTION.
-@param[in,out] ptr the PAGE_DIRECTION_B field
-@param[in,out] page index tree page frame
-@param[in] page_zip compressed page descriptor, or NULL
-@param[in] dir PAGE_RIGHT or PAGE_LEFT */
-static inline
-void
-page_direction_increment(
- byte* ptr,
- page_t* page,
- page_zip_des_t* page_zip,
- uint dir)
+@tparam compressed whether the page is in ROW_FORMAT=COMPRESSED
+@param[in,out] block index page
+@param[in,out] ptr the PAGE_DIRECTION_B field
+@param[in] dir PAGE_RIGHT or PAGE_LEFT
+@param[in,out] mtr mini-transaction */
+template<bool compressed=false>
+inline void page_direction_increment(buf_block_t *block, byte *ptr, uint dir,
+ mtr_t *mtr)
{
- ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
- ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT);
- page_ptr_set_direction(ptr, dir);
- if (page_zip) {
- page_zip_write_header(page_zip, ptr, 1, NULL);
- }
- page_header_set_field(
- page, page_zip, PAGE_N_DIRECTION,
- 1U + page_header_get_field(page, PAGE_N_DIRECTION));
+ ut_ad(!block->page.zip.data || page_is_comp(block->frame));
+ ut_ad(!compressed || block->page.zip.data);
+ ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + block->frame);
+ ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT);
+ if (compressed)
+ {
+ *ptr= static_cast<byte>(dir);
+ mach_write_to_2(ptr + 1, 1 + mach_read_from_2(ptr + 1));
+ page_zip_write_header(&block->page.zip, ptr, 3, mtr);
+ }
+ else
+ {
+ mtr->write<1,mtr_t::OPT>(*block, ptr, (*ptr & ~((1U << 3) - 1)) | dir);
+ mtr->write<2>(*block, ptr + 1, 1U + mach_read_from_2(ptr + 1));
+ }
}
/**
@@ -1228,7 +1053,6 @@ static void page_dir_slot_set_n_owned(buf_block_t *block,
page_rec_set_n_owned<compressed>(block, rec, n, page_rec_is_comp(rec), mtr);
}
-
/**
Split a directory slot which owns too many records.
@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well
@@ -1269,7 +1093,7 @@ static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
const ulint half_owned= n_owned / 2;
- if (compressed && UNIV_LIKELY_NULL(block->page.zip.data))
+ if (compressed)
{
/* Log changes to the compressed page header and the dense page
directory. */
@@ -1392,24 +1216,47 @@ func_exit:
}
/** Allocate space for inserting an index record.
-@param[in,out] page index page
-@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL
+@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well
+@param[in,out] block index page
@param[in] need number of bytes needed
@param[out] heap_no record heap number
+@param[in,out] mtr mini-transaction
@return pointer to the start of the allocated buffer
@retval NULL if allocation fails */
-static byte* page_mem_alloc_heap(page_t* page, page_zip_des_t* page_zip,
- ulint need, ulint* heap_no)
+template<bool compressed=false>
+static byte* page_mem_alloc_heap(buf_block_t *block, ulint need,
+ ulint *heap_no, mtr_t *mtr)
{
- if (need > page_get_max_insert_size(page, 1)) {
- return NULL;
- }
+ ut_ad(!compressed || block->page.zip.data);
+
+ byte *heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER +
+ block->frame);
+
+ const uint16_t top= mach_read_from_2(heap_top);
+
+ if (need > page_get_max_insert_size(block->frame, 1))
+ return NULL;
+
+ byte *n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER + block->frame);
+
+ const uint16_t h= mach_read_from_2(n_heap);
+ *heap_no= h & 0x7fff;
+ ut_ad(*heap_no < srv_page_size / REC_N_NEW_EXTRA_BYTES);
+ compile_time_assert(UNIV_PAGE_SIZE_MAX / REC_N_NEW_EXTRA_BYTES < 0x3fff);
- byte* top = page_header_get_ptr(page, PAGE_HEAP_TOP);
- page_header_set_ptr(page, page_zip, PAGE_HEAP_TOP, top + need);
- *heap_no = page_dir_get_n_heap(page);
- page_dir_set_n_heap(page, page_zip, 1 + *heap_no);
- return top;
+ mach_write_to_2(heap_top, top + need);
+ mach_write_to_2(n_heap, h + 1);
+
+ if (compressed)
+ {
+ ut_ad(h & 0x8000);
+ page_zip_write_header(&block->page.zip, heap_top, 4, mtr);
+ }
+ else
+ mtr->memcpy(*block, PAGE_HEAP_TOP + PAGE_HEADER, 4);
+
+ compile_time_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2);
+ return &block->frame[top];
}
/***********************************************************//**
@@ -1451,11 +1298,9 @@ page_cur_insert_rec_low(
ut_ad(!page_rec_is_supremum(current_rec));
- const mtr_log_t log_mode = mtr->set_log_mode(MTR_LOG_NONE);
-
/* We should not write log for ROW_FORMAT=COMPRESSED pages here. */
- ut_ad(log_mode == MTR_LOG_NONE
- || log_mode == MTR_LOG_NO_REDO
+ ut_ad(mtr->get_log_mode() == MTR_LOG_NONE
+ || mtr->get_log_mode() == MTR_LOG_NO_REDO
|| !(index->table->flags & DICT_TF_MASK_ZIP_SSIZE));
/* 1. Get the size of the physical record in the page */
@@ -1502,29 +1347,37 @@ page_cur_insert_rec_low(
insert_buf = free_rec - rec_offs_extra_size(foffsets);
+ byte* page_free = my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER
+ + block->frame);
+ byte* page_garbage = my_assume_aligned<2>(PAGE_GARBAGE
+ + PAGE_HEADER
+ + block->frame);
+ ut_ad(mach_read_from_2(page_garbage) >= rec_size);
+ mach_write_to_2(page_garbage, mach_read_from_2(page_garbage)
+ - rec_size);
if (page_is_comp(block->frame)) {
heap_no = rec_get_heap_no_new(free_rec);
- page_mem_alloc_free(block->frame, NULL,
- rec_get_next_ptr(free_rec, TRUE),
- rec_size);
+ const rec_t* next = rec_get_next_ptr(free_rec, true);
+ mach_write_to_2(page_free,
+ next ? page_offset(next) : 0);
} else {
heap_no = rec_get_heap_no_old(free_rec);
- page_mem_alloc_free(block->frame, NULL,
- rec_get_next_ptr(free_rec, FALSE),
- rec_size);
+ memcpy(page_free, free_rec - REC_NEXT, 2);
}
+ compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2);
+ mtr->memcpy(*block, PAGE_FREE + PAGE_HEADER, 4);
+
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
} else {
use_heap:
free_rec = NULL;
- insert_buf = page_mem_alloc_heap(block->frame, NULL,
- rec_size, &heap_no);
+ insert_buf = page_mem_alloc_heap(block, rec_size, &heap_no,
+ mtr);
if (UNIV_UNLIKELY(insert_buf == NULL)) {
- mtr->set_log_mode(log_mode);
return(NULL);
}
}
@@ -1539,9 +1392,10 @@ use_heap:
{
/* next record after current before the insertion */
- rec_t* next_rec = page_rec_get_next(current_rec);
-#ifdef UNIV_DEBUG
if (page_is_comp(block->frame)) {
+ const rec_t* next_rec = page_rec_get_next_low(
+ current_rec, true);
+#ifdef UNIV_DEBUG
switch (rec_get_status(current_rec)) {
case REC_STATUS_ORDINARY:
case REC_STATUS_NODE_PTR:
@@ -1561,24 +1415,42 @@ use_heap:
ut_ad(!"wrong status on insert_rec");
}
ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
- }
#endif
- page_rec_set_next(insert_rec, next_rec);
- page_rec_set_next(current_rec, insert_rec);
+ mach_write_to_2(insert_rec - REC_NEXT,
+ static_cast<uint16_t>
+ (next_rec - insert_rec));
+ mtr->write<2>(*block, current_rec - REC_NEXT,
+ static_cast<uint16_t>
+ (insert_rec - current_rec));
+ } else {
+ memcpy(insert_rec - REC_NEXT, current_rec - REC_NEXT,
+ 2);
+ mtr->write<2>(*block, current_rec - REC_NEXT,
+ page_offset(insert_rec));
+ }
}
- page_header_set_field(block->frame, NULL, PAGE_N_RECS,
- 1U + page_get_n_recs(block->frame));
+ mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame,
+ 1U + page_get_n_recs(block->frame));
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
- page_rec_set_n_owned<false>(block, insert_rec, 0,
- page_is_comp(block->frame), mtr);
- rec_set_heap_no(*block, insert_rec, heap_no,
- page_is_comp(block->frame), mtr);
+ if (page_is_comp(block->frame)) {
+ rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ rec_set_bit_field_2(insert_rec, heap_no, REC_NEW_HEAP_NO,
+ REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
+ } else {
+ rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ rec_set_bit_field_2(insert_rec, heap_no, REC_OLD_HEAP_NO,
+ REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
+ }
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
+ mtr->memcpy(*block, page_offset(insert_buf), rec_offs_size(offsets));
+
/* 6. Update the last insertion info in page header */
last_insert = page_header_get_ptr(block->frame, PAGE_LAST_INSERT);
@@ -1586,25 +1458,24 @@ use_heap:
|| rec_get_node_ptr_flag(last_insert)
== rec_get_node_ptr_flag(insert_rec));
- if (!dict_index_is_spatial(index)) {
+ if (!index->is_spatial()) {
byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + block->frame;
if (UNIV_UNLIKELY(last_insert == NULL)) {
no_direction:
- page_direction_reset(ptr, block->frame, NULL);
+ page_direction_reset(block, ptr, mtr);
} else if (last_insert == current_rec
&& page_ptr_get_direction(ptr) != PAGE_LEFT) {
- page_direction_increment(ptr, block->frame, NULL,
- PAGE_RIGHT);
+ page_direction_increment(block, ptr, PAGE_RIGHT, mtr);
} else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
&& page_rec_get_next(insert_rec) == last_insert) {
- page_direction_increment(ptr, block->frame, NULL,
- PAGE_LEFT);
+ page_direction_increment(block, ptr, PAGE_LEFT, mtr);
} else {
goto no_direction;
}
}
- page_header_set_ptr(block->frame, NULL, PAGE_LAST_INSERT, insert_rec);
+ mtr->write<2>(*block, PAGE_LAST_INSERT + PAGE_HEADER + block->frame,
+ page_offset(insert_rec));
/* 7. It remains to update the owner record. */
{
@@ -1612,14 +1483,12 @@ no_direction:
ulint n_owned;
if (page_is_comp(block->frame)) {
n_owned = rec_get_n_owned_new(owner_rec);
- rec_set_bit_field_1(owner_rec, n_owned + 1,
- REC_NEW_N_OWNED, REC_N_OWNED_MASK,
- REC_N_OWNED_SHIFT);
+ page_rec_set_n_owned<false>(block, owner_rec,
+ n_owned + 1, true, mtr);
} else {
n_owned = rec_get_n_owned_old(owner_rec);
- rec_set_bit_field_1(owner_rec, n_owned + 1,
- REC_OLD_N_OWNED, REC_N_OWNED_MASK,
- REC_N_OWNED_SHIFT);
+ page_rec_set_n_owned<false>(block, owner_rec,
+ n_owned + 1, false, mtr);
}
/* 8. Now we have incremented the n_owned field of the owner
@@ -1633,14 +1502,88 @@ no_direction:
}
}
- /* 9. Write log record of the insert */
- mtr->set_log_mode(log_mode);
- page_cur_insert_rec_write_log(insert_rec, rec_size,
- current_rec, index, mtr);
-
return(insert_rec);
}
+/** Add a slot to the dense page directory.
+@param[in,out] block ROW_FORMAT=COMPRESSED page
+@param[in] index the index that the page belongs to
+@param[in,out] mtr mini-transaction */
+static inline void page_zip_dir_add_slot(buf_block_t *block,
+ const dict_index_t *index, mtr_t *mtr)
+{
+ page_zip_des_t *page_zip= &block->page.zip;
+
+ ut_ad(page_is_comp(page_zip->data));
+ UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
+
+ /* Read the old n_dense (n_heap has already been incremented). */
+ ulint n_dense= page_dir_get_n_heap(page_zip->data) - (PAGE_HEAP_NO_USER_LOW +
+ 1U);
+
+ byte *dir= page_zip->data + page_zip_get_size(page_zip) -
+ PAGE_ZIP_DIR_SLOT_SIZE * n_dense;
+ byte *stored= dir;
+
+ if (!page_is_leaf(page_zip->data))
+ {
+ ut_ad(!page_zip->n_blobs);
+ stored-= n_dense * REC_NODE_PTR_SIZE;
+ }
+ else if (index->is_clust())
+ {
+ /* Move the BLOB pointer array backwards to make space for the
+ columns DB_TRX_ID,DB_ROLL_PTR and the dense directory slot. */
+
+ stored-= n_dense * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
+ byte *externs= stored - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
+ byte *dst= externs - PAGE_ZIP_CLUST_LEAF_SLOT_SIZE;
+ ut_ad(!memcmp(dst, field_ref_zero, PAGE_ZIP_CLUST_LEAF_SLOT_SIZE));
+ if (const ulint len = ulint(stored - externs))
+ {
+ memmove(dst, externs, len);
+ /* TODO: write MEMMOVE record */
+ if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2))
+ {
+ log_ptr= mlog_write_initial_log_record_low(MLOG_ZIP_WRITE_STRING,
+ block->page.id.space(),
+ block->page.id.page_no(),
+ log_ptr, mtr);
+ mach_write_to_2(log_ptr, dst - page_zip->data);
+ mach_write_to_2(log_ptr + 2, len);
+ mlog_close(mtr, log_ptr + 4);
+ mlog_catenate_string(mtr, dst, len);
+ }
+ }
+ }
+ else
+ {
+ stored-= page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
+ ut_ad(!memcmp(stored - PAGE_ZIP_DIR_SLOT_SIZE, field_ref_zero,
+ PAGE_ZIP_DIR_SLOT_SIZE));
+ }
+
+ /* Move the uncompressed area backwards to make space
+ for one directory slot. */
+ if (const ulint len = ulint(dir - stored))
+ {
+ byte* dst = stored - PAGE_ZIP_DIR_SLOT_SIZE;
+ memmove(dst, stored, len);
+ /* TODO: write MEMMOVE record */
+ if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2))
+ {
+ log_ptr= mlog_write_initial_log_record_low(MLOG_ZIP_WRITE_STRING,
+ block->page.id.space(),
+ block->page.id.page_no(),
+ log_ptr, mtr);
+ mach_write_to_2(log_ptr, dst - page_zip->data);
+ mach_write_to_2(log_ptr + 2, len);
+ mlog_close(mtr, log_ptr + 4);
+ mlog_catenate_string(mtr, dst, len);
+ }
+ }
+}
+
/***********************************************************//**
Inserts a record next to page cursor on a compressed and uncompressed
page. Returns pointer to inserted record if succeed, i.e.,
@@ -1665,8 +1608,6 @@ page_cur_insert_rec_zip(
byte* insert_buf;
ulint rec_size;
page_t* page; /*!< the relevant page */
- rec_t* last_insert; /*!< cursor position at previous
- insert */
rec_t* free_rec; /*!< a free record that was reused,
or NULL */
rec_t* insert_rec; /*!< inserted record */
@@ -1676,7 +1617,6 @@ page_cur_insert_rec_zip(
page_zip = page_cur_get_page_zip(cursor);
ut_ad(page_zip);
-
ut_ad(rec_offs_validate(rec, index, offsets));
page = page_cur_get_page(cursor);
@@ -1727,6 +1667,7 @@ page_cur_insert_rec_zip(
rec_t* cursor_rec = page_cur_get_rec(cursor);
#endif /* UNIV_DEBUG */
+#if 1 /* MDEV-12353 FIXME: skip this for the physical log format! */
/* If we are not writing compressed page images, we
must reorganize the page before attempting the
insert. */
@@ -1735,6 +1676,7 @@ page_cur_insert_rec_zip(
The page reorganization or creation that we
would attempt outside crash recovery would
have been covered by a previous redo log record. */
+#endif
} else if (page_is_empty(page)) {
ut_ad(page_cur_is_before_first(cursor));
@@ -1813,12 +1755,14 @@ page_cur_insert_rec_zip(
be logged after a successful operation. */
ut_ad(!recv_recovery_is_on());
ut_ad(!index->is_dummy);
+#if 1 /* MDEV-12353 FIXME: skip this for the physical log format! */
} else if (recv_recovery_is_on()) {
/* This should be followed by
MLOG_ZIP_PAGE_COMPRESS_NO_DATA,
which should succeed. */
rec_offs_make_valid(insert_rec, index,
page_is_leaf(page), offsets);
+#endif
} else {
ulint pos = page_rec_get_n_recs_before(insert_rec);
ut_ad(pos > 0);
@@ -1841,7 +1785,7 @@ page_cur_insert_rec_zip(
rec_offs_make_valid(
insert_rec, index,
page_is_leaf(page), offsets);
- return(insert_rec);
+ return insert_rec;
}
/* Theoretically, we could try one last resort
@@ -1908,9 +1852,16 @@ too_small:
}
heap_no = rec_get_heap_no_new(free_rec);
- page_mem_alloc_free(page, page_zip,
- rec_get_next_ptr(free_rec, TRUE),
- rec_size);
+ const rec_t* next = rec_get_next_ptr_const(free_rec, true);
+ mach_write_to_2(PAGE_FREE + PAGE_HEADER + page,
+ next ? page_offset(next) : 0);
+ byte* garbage = PAGE_GARBAGE + PAGE_HEADER + page;
+ ut_ad(mach_read_from_2(garbage) >= rec_size);
+ mach_write_to_2(garbage, mach_read_from_2(garbage) - rec_size);
+ compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2);
+ page_zip_write_header(page_zip,
+ PAGE_HEADER + PAGE_FREE + page, 4, mtr);
+ /* TODO: group with PAGE_LAST_INSERT */
if (!page_is_leaf(page)) {
/* Zero out the node pointer of free_rec,
@@ -1961,14 +1912,14 @@ too_small:
} else {
use_heap:
free_rec = NULL;
- insert_buf = page_mem_alloc_heap(page, page_zip,
- rec_size, &heap_no);
+ insert_buf = page_mem_alloc_heap<true>(cursor->block, rec_size,
+ &heap_no, mtr);
if (UNIV_UNLIKELY(insert_buf == NULL)) {
return(NULL);
}
- page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
+ page_zip_dir_add_slot(cursor->block, index, mtr);
}
/* 3. Create the record */
@@ -1978,21 +1929,19 @@ use_heap:
/* 4. Insert the record in the linked list of records */
ut_ad(cursor->rec != insert_rec);
- {
- /* next record after current before the insertion */
- const rec_t* next_rec = page_rec_get_next_low(
- cursor->rec, TRUE);
- ut_ad(rec_get_status(cursor->rec)
- <= REC_STATUS_INFIMUM);
- ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
- ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
-
- page_rec_set_next(insert_rec, next_rec);
- page_rec_set_next(cursor->rec, insert_rec);
- }
+ /* next record after current before the insertion */
+ const rec_t* next_rec = page_rec_get_next_low(cursor->rec, TRUE);
+ ut_ad(rec_get_status(cursor->rec) <= REC_STATUS_INFIMUM);
+ ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
+ ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
- page_header_set_field(page, page_zip, PAGE_N_RECS,
- 1U + page_get_n_recs(page));
+ mach_write_to_2(insert_rec - REC_NEXT, static_cast<uint16_t>
+ (next_rec - insert_rec));
+ mach_write_to_2(cursor->rec - REC_NEXT, static_cast<uint16_t>
+ (insert_rec - cursor->rec));
+ byte* n_recs = PAGE_N_RECS + PAGE_HEADER + page;
+ mach_write_to_2(n_recs, mach_read_from_2(n_recs) + 1);
+ page_zip_write_header(page_zip, n_recs, 2, mtr);
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
@@ -2004,35 +1953,37 @@ use_heap:
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
- page_zip_dir_insert(cursor, free_rec, insert_rec);
+ page_zip_dir_insert(cursor, free_rec, insert_rec, mtr);
/* 6. Update the last insertion info in page header */
-
- last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
- ut_ad(!last_insert
- || rec_get_node_ptr_flag(last_insert)
+ byte* last_insert = PAGE_LAST_INSERT + PAGE_HEADER + page;
+ const uint16_t last_insert_rec = mach_read_from_2(last_insert);
+ ut_ad(!last_insert_rec
+ || rec_get_node_ptr_flag(page + last_insert_rec)
== rec_get_node_ptr_flag(insert_rec));
+ /* TODO: combine with PAGE_DIRECTION changes */
+ mach_write_to_2(last_insert, page_offset(insert_rec));
+ page_zip_write_header(page_zip, last_insert, 2, mtr);
- if (!dict_index_is_spatial(index)) {
+ if (!index->is_spatial()) {
byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
- if (UNIV_UNLIKELY(last_insert == NULL)) {
+ if (UNIV_UNLIKELY(!last_insert_rec)) {
no_direction:
- page_direction_reset(ptr, page, page_zip);
- } else if (last_insert == cursor->rec
+ page_direction_reset<true>(cursor->block, ptr, mtr);
+ } else if (page + last_insert_rec == cursor->rec
&& page_ptr_get_direction(ptr) != PAGE_LEFT) {
- page_direction_increment(ptr, page, page_zip,
- PAGE_RIGHT);
+ page_direction_increment<true>(cursor->block, ptr,
+ PAGE_RIGHT, mtr);
} else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
- && page_rec_get_next(insert_rec) == last_insert) {
- page_direction_increment(ptr, page, page_zip,
- PAGE_LEFT);
+ && page_rec_get_next(insert_rec)
+ == page + last_insert_rec) {
+ page_direction_increment<true>(cursor->block, ptr,
+ PAGE_LEFT, mtr);
} else {
goto no_direction;
}
}
- page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
-
/* 7. It remains to update the owner record. */
{
rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
@@ -2047,22 +1998,14 @@ no_direction:
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
- const mtr_log_t log_mode = mtr->set_log_mode(
- MTR_LOG_NONE);
page_dir_split_slot<true>(
page_cur_get_block(cursor),
page_dir_find_owner_slot(owner_rec), mtr);
- mtr->set_log_mode(log_mode);
}
}
- page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
-
- /* 9. Write log record of the insert */
- page_cur_insert_rec_write_log(insert_rec, rec_size,
- cursor->rec, index, mtr);
-
- return(insert_rec);
+ page_zip_write_rec(page_zip, insert_rec, index, offsets, 1, mtr);
+ return insert_rec;
}
/**********************************************************//**
@@ -2079,8 +2022,6 @@ page_parse_copy_rec_list_to_created_page(
mtr_t* mtr) /*!< in: mtr or NULL */
{
ulint log_data_len;
- page_t* page;
- page_zip_des_t* page_zip;
ut_ad(index->is_dummy);
@@ -2119,14 +2060,16 @@ page_parse_copy_rec_list_to_created_page(
ut_a(ptr == rec_end);
- page = buf_block_get_frame(block);
- page_zip = buf_block_get_page_zip(block);
-
- page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
+ memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT + block->frame, 0, 2);
+ if (block->page.zip.data) {
+ memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT
+ + block->page.zip.data, 0, 2);
+ }
- if (!dict_index_is_spatial(index)) {
- page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + page,
- page, page_zip);
+ if (!index->is_spatial()) {
+ page_direction_reset<true>(block,
+ PAGE_HEADER + PAGE_DIRECTION_B
+ + block->frame, mtr);
}
return(rec_end);
@@ -2246,8 +2189,6 @@ page_copy_rec_list_end_to_created_page(
heap_top += rec_size;
rec_offs_make_valid(insert_rec, index, is_leaf, offsets);
- page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
- index, mtr);
prev_rec = insert_rec;
rec = page_rec_get_next(rec);
} while (!page_rec_is_supremum(rec));
@@ -2293,14 +2234,18 @@ page_copy_rec_list_end_to_created_page(
mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page,
2 + slot_index);
- page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
- page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
- page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
-
- *reinterpret_cast<uint16_t*>(PAGE_HEADER + PAGE_LAST_INSERT + new_page)
- = 0;
- page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + new_page,
- new_page, NULL);
+ mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + new_page,
+ page_offset(heap_top));
+ mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + new_page,
+ PAGE_HEAP_NO_USER_LOW + n_recs);
+ mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + new_page, n_recs);
+
+ memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT + new_page, 0, 2);
+ mach_write_to_1(PAGE_HEADER + PAGE_DIRECTION_B + new_page,
+ (mach_read_from_1(PAGE_HEADER + PAGE_DIRECTION_B
+ + new_page) & ~((1U << 3) - 1))
+ | PAGE_NO_DIRECTION);
+ memset_aligned<2>(PAGE_HEADER + PAGE_N_DIRECTION + new_page, 0, 2);
}
/***********************************************************//**