diff options
Diffstat (limited to 'storage/innobase/page/page0cur.cc')
-rw-r--r-- | storage/innobase/page/page0cur.cc | 667 |
1 files changed, 306 insertions, 361 deletions
diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index ab3f6e11bb8..5fa6df85558 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -784,188 +784,6 @@ page_cur_open_on_rnd_user_rec( ut_rnd_interval(n_recs) + 1); } -/** Write a redo log record of inserting a record into an index page. -@param[in] insert_rec inserted record -@param[in] rec_size rec_get_size(insert_rec) -@param[in] cursor_rec predecessor of insert_rec -@param[in,out] index index tree -@param[in,out] mtr mini-transaction */ -void -page_cur_insert_rec_write_log( - const rec_t* insert_rec, - ulint rec_size, - const rec_t* cursor_rec, - dict_index_t* index, - mtr_t* mtr) -{ - ulint cur_rec_size; - ulint extra_size; - ulint cur_extra_size; - const byte* ins_ptr; - const byte* log_end; - ulint i; - - if (index->table->is_temporary()) { - mtr->set_modified(); - ut_ad(mtr->get_log_mode() == MTR_LOG_NO_REDO); - return; - } - - ut_a(rec_size < srv_page_size); - ut_ad(mtr->is_named_space(index->table->space)); - ut_ad(page_align(insert_rec) == page_align(cursor_rec)); - ut_ad(!page_rec_is_comp(insert_rec) - == !dict_table_is_comp(index->table)); - - const bool is_leaf = page_rec_is_leaf(cursor_rec); - - { - mem_heap_t* heap = NULL; - offset_t cur_offs_[REC_OFFS_NORMAL_SIZE]; - offset_t ins_offs_[REC_OFFS_NORMAL_SIZE]; - - offset_t* cur_offs; - offset_t* ins_offs; - - rec_offs_init(cur_offs_); - rec_offs_init(ins_offs_); - - cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_, - is_leaf, ULINT_UNDEFINED, &heap); - ins_offs = rec_get_offsets(insert_rec, index, ins_offs_, - is_leaf, ULINT_UNDEFINED, &heap); - - extra_size = rec_offs_extra_size(ins_offs); - cur_extra_size = rec_offs_extra_size(cur_offs); - ut_ad(rec_size == rec_offs_size(ins_offs)); - cur_rec_size = rec_offs_size(cur_offs); - - if (UNIV_LIKELY_NULL(heap)) { - mem_heap_free(heap); - } - } - - ins_ptr = insert_rec - extra_size; - - i = 0; - - if (cur_extra_size == extra_size) { - ulint min_rec_size = ut_min(cur_rec_size, rec_size); - - const byte* cur_ptr = cursor_rec - cur_extra_size; - - /* Find out the first byte in insert_rec which differs from - cursor_rec; skip the bytes in the record info */ - - do { - if (*ins_ptr == *cur_ptr) { - i++; - ins_ptr++; - cur_ptr++; - } else if ((i < extra_size) - && (i >= extra_size - - page_rec_get_base_extra_size - (insert_rec))) { - i = extra_size; - ins_ptr = insert_rec; - cur_ptr = cursor_rec; - } else { - break; - } - } while (i < min_rec_size); - } - - byte* log_ptr; - - if (page_rec_is_comp(insert_rec)) { - log_ptr = mlog_open_and_write_index( - mtr, insert_rec, index, MLOG_COMP_REC_INSERT, - 2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN); - if (UNIV_UNLIKELY(!log_ptr)) { - /* Logging in mtr is switched off - during crash recovery: in that case - mlog_open returns NULL */ - return; - } - } else { - log_ptr = mlog_open(mtr, 11 - + 2 + 5 + 1 + 5 + 5 - + MLOG_BUF_MARGIN); - if (UNIV_UNLIKELY(!log_ptr)) { - /* Logging in mtr is switched off - during crash recovery: in that case - mlog_open returns NULL */ - return; - } - - log_ptr = mlog_write_initial_log_record_fast( - insert_rec, MLOG_REC_INSERT, log_ptr, mtr); - } - - log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN]; - /* Write the cursor rec offset as a 2-byte ulint */ - mach_write_to_2(log_ptr, page_offset(cursor_rec)); - log_ptr += 2; - - if (page_rec_is_comp(insert_rec)) { - if (UNIV_UNLIKELY - (rec_get_info_and_status_bits(insert_rec, TRUE) - != rec_get_info_and_status_bits(cursor_rec, TRUE))) { - - goto need_extra_info; - } - } else { - if (UNIV_UNLIKELY - (rec_get_info_and_status_bits(insert_rec, FALSE) - != rec_get_info_and_status_bits(cursor_rec, FALSE))) { - - goto need_extra_info; - } - } - - if (extra_size != cur_extra_size || rec_size != cur_rec_size) { -need_extra_info: - /* Write the record end segment length - and the extra info storage flag */ - log_ptr += mach_write_compressed(log_ptr, - 2 * (rec_size - i) + 1); - - /* Write the info bits */ - mach_write_to_1(log_ptr, - rec_get_info_and_status_bits( - insert_rec, - page_rec_is_comp(insert_rec))); - log_ptr++; - - /* Write the record origin offset */ - log_ptr += mach_write_compressed(log_ptr, extra_size); - - /* Write the mismatch index */ - log_ptr += mach_write_compressed(log_ptr, i); - - ut_a(i < srv_page_size); - ut_a(extra_size < srv_page_size); - } else { - /* Write the record end segment length - and the extra info storage flag */ - log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i)); - } - - /* Write to the log the inserted index record end segment which - differs from the cursor record */ - - rec_size -= i; - - if (log_ptr + rec_size <= log_end) { - memcpy(log_ptr, ins_ptr, rec_size); - mlog_close(mtr, log_ptr + rec_size); - } else { - mlog_close(mtr, log_ptr); - ut_a(rec_size < srv_page_size); - mlog_catenate_string(mtr, ins_ptr, rec_size); - } -} - static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact) { rec_set_bit_field_2(rec, heap_no, @@ -973,20 +791,10 @@ static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact) REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); } -static void rec_set_heap_no(const buf_block_t& block, rec_t *rec, - ulint heap_no, bool compact, mtr_t *mtr) -{ - rec-= compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO; - - // MDEV-12353 FIXME: try single-byte write if possible - mtr->write<2,mtr_t::OPT>(block, rec, - (mach_read_from_2(rec) & ~REC_HEAP_NO_MASK) | - (heap_no << REC_HEAP_NO_SHIFT)); -} - /***********************************************************//** Parses a log record of a record insert on a page. @return end of log record or NULL */ +ATTRIBUTE_COLD /* only used when crash-upgrading */ const byte* page_cur_parse_insert_rec( /*======================*/ @@ -1140,19 +948,26 @@ page_cur_parse_insert_rec( rec_set_info_and_status_bits(buf + origin_offset, info_and_status_bits); } else { - rec_set_info_bits_old(buf + origin_offset, - info_and_status_bits); + rec_set_bit_field_1(buf + origin_offset, info_and_status_bits, + REC_OLD_INFO_BITS, + REC_INFO_BITS_MASK, REC_INFO_BITS_SHIFT); } page_cur_position(cursor_rec, block, &cursor); offsets = rec_get_offsets(buf + origin_offset, index, offsets, is_leaf, ULINT_UNDEFINED, &heap); - if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor, - buf + origin_offset, - index, offsets, mtr))) { - /* The redo log record should only have been written - after the write was successful. */ + /* The redo log record should only have been written + after the write was successful. */ + if (block->page.zip.data) { + if (!page_cur_insert_rec_zip(&cursor, index, + buf + origin_offset, + offsets, mtr)) { + ut_error; + } + } else if (!page_cur_insert_rec_low(&cursor, index, + buf + origin_offset, + offsets, mtr)) { ut_error; } @@ -1169,47 +984,57 @@ page_cur_parse_insert_rec( } /** Reset PAGE_DIRECTION and PAGE_N_DIRECTION. -@param[in,out] ptr the PAGE_DIRECTION_B field -@param[in,out] page index tree page frame -@param[in] page_zip compressed page descriptor, or NULL */ -static inline -void -page_direction_reset(byte* ptr, page_t* page, page_zip_des_t* page_zip) +@tparam compressed whether the page is in ROW_FORMAT=COMPRESSED +@param[in,out] block index page +@param[in,out] ptr the PAGE_DIRECTION_B field +@param[in,out] mtr mini-transaction */ +template<bool compressed=false> +inline void page_direction_reset(buf_block_t *block, byte *ptr, mtr_t *mtr) { - ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page); - page_ptr_set_direction(ptr, PAGE_NO_DIRECTION); - if (page_zip) { - page_zip_write_header(page_zip, ptr, 1, NULL); - } - ptr = PAGE_HEADER + PAGE_N_DIRECTION + page; - *reinterpret_cast<uint16_t*>(ptr) = 0; - if (page_zip) { - page_zip_write_header(page_zip, ptr, 2, NULL); - } + ut_ad(!block->page.zip.data || page_is_comp(block->frame)); + ut_ad(!compressed || block->page.zip.data); + ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + block->frame); + static_assert(PAGE_DIRECTION_B + 1 == PAGE_N_DIRECTION, "adjacent fields"); + + if (compressed) + { + *ptr= PAGE_NO_DIRECTION; /* no instant ALTER bits */ + memset_aligned<2>(ptr + 1, 0, 2); + page_zip_write_header(&block->page.zip, ptr, 3, mtr); + } + else + { + mtr->write<1,mtr_t::OPT>(*block, ptr, (*ptr & ~((1U << 3) - 1)) + | PAGE_NO_DIRECTION); + mtr->write<2,mtr_t::OPT>(*block, ptr + 1, 0U); + } } /** Increment PAGE_N_DIRECTION. -@param[in,out] ptr the PAGE_DIRECTION_B field -@param[in,out] page index tree page frame -@param[in] page_zip compressed page descriptor, or NULL -@param[in] dir PAGE_RIGHT or PAGE_LEFT */ -static inline -void -page_direction_increment( - byte* ptr, - page_t* page, - page_zip_des_t* page_zip, - uint dir) +@tparam compressed whether the page is in ROW_FORMAT=COMPRESSED +@param[in,out] block index page +@param[in,out] ptr the PAGE_DIRECTION_B field +@param[in] dir PAGE_RIGHT or PAGE_LEFT +@param[in,out] mtr mini-transaction */ +template<bool compressed=false> +inline void page_direction_increment(buf_block_t *block, byte *ptr, uint dir, + mtr_t *mtr) { - ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page); - ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT); - page_ptr_set_direction(ptr, dir); - if (page_zip) { - page_zip_write_header(page_zip, ptr, 1, NULL); - } - page_header_set_field( - page, page_zip, PAGE_N_DIRECTION, - 1U + page_header_get_field(page, PAGE_N_DIRECTION)); + ut_ad(!block->page.zip.data || page_is_comp(block->frame)); + ut_ad(!compressed || block->page.zip.data); + ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + block->frame); + ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT); + if (compressed) + { + *ptr= static_cast<byte>(dir); + mach_write_to_2(ptr + 1, 1 + mach_read_from_2(ptr + 1)); + page_zip_write_header(&block->page.zip, ptr, 3, mtr); + } + else + { + mtr->write<1,mtr_t::OPT>(*block, ptr, (*ptr & ~((1U << 3) - 1)) | dir); + mtr->write<2>(*block, ptr + 1, 1U + mach_read_from_2(ptr + 1)); + } } /** @@ -1228,7 +1053,6 @@ static void page_dir_slot_set_n_owned(buf_block_t *block, page_rec_set_n_owned<compressed>(block, rec, n, page_rec_is_comp(rec), mtr); } - /** Split a directory slot which owns too many records. @tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well @@ -1269,7 +1093,7 @@ static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr) const ulint half_owned= n_owned / 2; - if (compressed && UNIV_LIKELY_NULL(block->page.zip.data)) + if (compressed) { /* Log changes to the compressed page header and the dense page directory. */ @@ -1392,24 +1216,47 @@ func_exit: } /** Allocate space for inserting an index record. -@param[in,out] page index page -@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL +@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well +@param[in,out] block index page @param[in] need number of bytes needed @param[out] heap_no record heap number +@param[in,out] mtr mini-transaction @return pointer to the start of the allocated buffer @retval NULL if allocation fails */ -static byte* page_mem_alloc_heap(page_t* page, page_zip_des_t* page_zip, - ulint need, ulint* heap_no) +template<bool compressed=false> +static byte* page_mem_alloc_heap(buf_block_t *block, ulint need, + ulint *heap_no, mtr_t *mtr) { - if (need > page_get_max_insert_size(page, 1)) { - return NULL; - } + ut_ad(!compressed || block->page.zip.data); + + byte *heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER + + block->frame); + + const uint16_t top= mach_read_from_2(heap_top); + + if (need > page_get_max_insert_size(block->frame, 1)) + return NULL; + + byte *n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER + block->frame); + + const uint16_t h= mach_read_from_2(n_heap); + *heap_no= h & 0x7fff; + ut_ad(*heap_no < srv_page_size / REC_N_NEW_EXTRA_BYTES); + compile_time_assert(UNIV_PAGE_SIZE_MAX / REC_N_NEW_EXTRA_BYTES < 0x3fff); - byte* top = page_header_get_ptr(page, PAGE_HEAP_TOP); - page_header_set_ptr(page, page_zip, PAGE_HEAP_TOP, top + need); - *heap_no = page_dir_get_n_heap(page); - page_dir_set_n_heap(page, page_zip, 1 + *heap_no); - return top; + mach_write_to_2(heap_top, top + need); + mach_write_to_2(n_heap, h + 1); + + if (compressed) + { + ut_ad(h & 0x8000); + page_zip_write_header(&block->page.zip, heap_top, 4, mtr); + } + else + mtr->memcpy(*block, PAGE_HEAP_TOP + PAGE_HEADER, 4); + + compile_time_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2); + return &block->frame[top]; } /***********************************************************//** @@ -1451,11 +1298,9 @@ page_cur_insert_rec_low( ut_ad(!page_rec_is_supremum(current_rec)); - const mtr_log_t log_mode = mtr->set_log_mode(MTR_LOG_NONE); - /* We should not write log for ROW_FORMAT=COMPRESSED pages here. */ - ut_ad(log_mode == MTR_LOG_NONE - || log_mode == MTR_LOG_NO_REDO + ut_ad(mtr->get_log_mode() == MTR_LOG_NONE + || mtr->get_log_mode() == MTR_LOG_NO_REDO || !(index->table->flags & DICT_TF_MASK_ZIP_SSIZE)); /* 1. Get the size of the physical record in the page */ @@ -1502,29 +1347,37 @@ page_cur_insert_rec_low( insert_buf = free_rec - rec_offs_extra_size(foffsets); + byte* page_free = my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER + + block->frame); + byte* page_garbage = my_assume_aligned<2>(PAGE_GARBAGE + + PAGE_HEADER + + block->frame); + ut_ad(mach_read_from_2(page_garbage) >= rec_size); + mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) + - rec_size); if (page_is_comp(block->frame)) { heap_no = rec_get_heap_no_new(free_rec); - page_mem_alloc_free(block->frame, NULL, - rec_get_next_ptr(free_rec, TRUE), - rec_size); + const rec_t* next = rec_get_next_ptr(free_rec, true); + mach_write_to_2(page_free, + next ? page_offset(next) : 0); } else { heap_no = rec_get_heap_no_old(free_rec); - page_mem_alloc_free(block->frame, NULL, - rec_get_next_ptr(free_rec, FALSE), - rec_size); + memcpy(page_free, free_rec - REC_NEXT, 2); } + compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2); + mtr->memcpy(*block, PAGE_FREE + PAGE_HEADER, 4); + if (UNIV_LIKELY_NULL(heap)) { mem_heap_free(heap); } } else { use_heap: free_rec = NULL; - insert_buf = page_mem_alloc_heap(block->frame, NULL, - rec_size, &heap_no); + insert_buf = page_mem_alloc_heap(block, rec_size, &heap_no, + mtr); if (UNIV_UNLIKELY(insert_buf == NULL)) { - mtr->set_log_mode(log_mode); return(NULL); } } @@ -1539,9 +1392,10 @@ use_heap: { /* next record after current before the insertion */ - rec_t* next_rec = page_rec_get_next(current_rec); -#ifdef UNIV_DEBUG if (page_is_comp(block->frame)) { + const rec_t* next_rec = page_rec_get_next_low( + current_rec, true); +#ifdef UNIV_DEBUG switch (rec_get_status(current_rec)) { case REC_STATUS_ORDINARY: case REC_STATUS_NODE_PTR: @@ -1561,24 +1415,42 @@ use_heap: ut_ad(!"wrong status on insert_rec"); } ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM); - } #endif - page_rec_set_next(insert_rec, next_rec); - page_rec_set_next(current_rec, insert_rec); + mach_write_to_2(insert_rec - REC_NEXT, + static_cast<uint16_t> + (next_rec - insert_rec)); + mtr->write<2>(*block, current_rec - REC_NEXT, + static_cast<uint16_t> + (insert_rec - current_rec)); + } else { + memcpy(insert_rec - REC_NEXT, current_rec - REC_NEXT, + 2); + mtr->write<2>(*block, current_rec - REC_NEXT, + page_offset(insert_rec)); + } } - page_header_set_field(block->frame, NULL, PAGE_N_RECS, - 1U + page_get_n_recs(block->frame)); + mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame, + 1U + page_get_n_recs(block->frame)); /* 5. Set the n_owned field in the inserted record to zero, and set the heap_no field */ - page_rec_set_n_owned<false>(block, insert_rec, 0, - page_is_comp(block->frame), mtr); - rec_set_heap_no(*block, insert_rec, heap_no, - page_is_comp(block->frame), mtr); + if (page_is_comp(block->frame)) { + rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, heap_no, REC_NEW_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + } else { + rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED, + REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(insert_rec, heap_no, REC_OLD_HEAP_NO, + REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + } UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets), rec_offs_size(offsets)); + mtr->memcpy(*block, page_offset(insert_buf), rec_offs_size(offsets)); + /* 6. Update the last insertion info in page header */ last_insert = page_header_get_ptr(block->frame, PAGE_LAST_INSERT); @@ -1586,25 +1458,24 @@ use_heap: || rec_get_node_ptr_flag(last_insert) == rec_get_node_ptr_flag(insert_rec)); - if (!dict_index_is_spatial(index)) { + if (!index->is_spatial()) { byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + block->frame; if (UNIV_UNLIKELY(last_insert == NULL)) { no_direction: - page_direction_reset(ptr, block->frame, NULL); + page_direction_reset(block, ptr, mtr); } else if (last_insert == current_rec && page_ptr_get_direction(ptr) != PAGE_LEFT) { - page_direction_increment(ptr, block->frame, NULL, - PAGE_RIGHT); + page_direction_increment(block, ptr, PAGE_RIGHT, mtr); } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT && page_rec_get_next(insert_rec) == last_insert) { - page_direction_increment(ptr, block->frame, NULL, - PAGE_LEFT); + page_direction_increment(block, ptr, PAGE_LEFT, mtr); } else { goto no_direction; } } - page_header_set_ptr(block->frame, NULL, PAGE_LAST_INSERT, insert_rec); + mtr->write<2>(*block, PAGE_LAST_INSERT + PAGE_HEADER + block->frame, + page_offset(insert_rec)); /* 7. It remains to update the owner record. */ { @@ -1612,14 +1483,12 @@ no_direction: ulint n_owned; if (page_is_comp(block->frame)) { n_owned = rec_get_n_owned_new(owner_rec); - rec_set_bit_field_1(owner_rec, n_owned + 1, - REC_NEW_N_OWNED, REC_N_OWNED_MASK, - REC_N_OWNED_SHIFT); + page_rec_set_n_owned<false>(block, owner_rec, + n_owned + 1, true, mtr); } else { n_owned = rec_get_n_owned_old(owner_rec); - rec_set_bit_field_1(owner_rec, n_owned + 1, - REC_OLD_N_OWNED, REC_N_OWNED_MASK, - REC_N_OWNED_SHIFT); + page_rec_set_n_owned<false>(block, owner_rec, + n_owned + 1, false, mtr); } /* 8. Now we have incremented the n_owned field of the owner @@ -1633,14 +1502,88 @@ no_direction: } } - /* 9. Write log record of the insert */ - mtr->set_log_mode(log_mode); - page_cur_insert_rec_write_log(insert_rec, rec_size, - current_rec, index, mtr); - return(insert_rec); } +/** Add a slot to the dense page directory. +@param[in,out] block ROW_FORMAT=COMPRESSED page +@param[in] index the index that the page belongs to +@param[in,out] mtr mini-transaction */ +static inline void page_zip_dir_add_slot(buf_block_t *block, + const dict_index_t *index, mtr_t *mtr) +{ + page_zip_des_t *page_zip= &block->page.zip; + + ut_ad(page_is_comp(page_zip->data)); + UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip)); + + /* Read the old n_dense (n_heap has already been incremented). */ + ulint n_dense= page_dir_get_n_heap(page_zip->data) - (PAGE_HEAP_NO_USER_LOW + + 1U); + + byte *dir= page_zip->data + page_zip_get_size(page_zip) - + PAGE_ZIP_DIR_SLOT_SIZE * n_dense; + byte *stored= dir; + + if (!page_is_leaf(page_zip->data)) + { + ut_ad(!page_zip->n_blobs); + stored-= n_dense * REC_NODE_PTR_SIZE; + } + else if (index->is_clust()) + { + /* Move the BLOB pointer array backwards to make space for the + columns DB_TRX_ID,DB_ROLL_PTR and the dense directory slot. */ + + stored-= n_dense * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + byte *externs= stored - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE; + byte *dst= externs - PAGE_ZIP_CLUST_LEAF_SLOT_SIZE; + ut_ad(!memcmp(dst, field_ref_zero, PAGE_ZIP_CLUST_LEAF_SLOT_SIZE)); + if (const ulint len = ulint(stored - externs)) + { + memmove(dst, externs, len); + /* TODO: write MEMMOVE record */ + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) + { + log_ptr= mlog_write_initial_log_record_low(MLOG_ZIP_WRITE_STRING, + block->page.id.space(), + block->page.id.page_no(), + log_ptr, mtr); + mach_write_to_2(log_ptr, dst - page_zip->data); + mach_write_to_2(log_ptr + 2, len); + mlog_close(mtr, log_ptr + 4); + mlog_catenate_string(mtr, dst, len); + } + } + } + else + { + stored-= page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE; + ut_ad(!memcmp(stored - PAGE_ZIP_DIR_SLOT_SIZE, field_ref_zero, + PAGE_ZIP_DIR_SLOT_SIZE)); + } + + /* Move the uncompressed area backwards to make space + for one directory slot. */ + if (const ulint len = ulint(dir - stored)) + { + byte* dst = stored - PAGE_ZIP_DIR_SLOT_SIZE; + memmove(dst, stored, len); + /* TODO: write MEMMOVE record */ + if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) + { + log_ptr= mlog_write_initial_log_record_low(MLOG_ZIP_WRITE_STRING, + block->page.id.space(), + block->page.id.page_no(), + log_ptr, mtr); + mach_write_to_2(log_ptr, dst - page_zip->data); + mach_write_to_2(log_ptr + 2, len); + mlog_close(mtr, log_ptr + 4); + mlog_catenate_string(mtr, dst, len); + } + } +} + /***********************************************************//** Inserts a record next to page cursor on a compressed and uncompressed page. Returns pointer to inserted record if succeed, i.e., @@ -1665,8 +1608,6 @@ page_cur_insert_rec_zip( byte* insert_buf; ulint rec_size; page_t* page; /*!< the relevant page */ - rec_t* last_insert; /*!< cursor position at previous - insert */ rec_t* free_rec; /*!< a free record that was reused, or NULL */ rec_t* insert_rec; /*!< inserted record */ @@ -1676,7 +1617,6 @@ page_cur_insert_rec_zip( page_zip = page_cur_get_page_zip(cursor); ut_ad(page_zip); - ut_ad(rec_offs_validate(rec, index, offsets)); page = page_cur_get_page(cursor); @@ -1727,6 +1667,7 @@ page_cur_insert_rec_zip( rec_t* cursor_rec = page_cur_get_rec(cursor); #endif /* UNIV_DEBUG */ +#if 1 /* MDEV-12353 FIXME: skip this for the physical log format! */ /* If we are not writing compressed page images, we must reorganize the page before attempting the insert. */ @@ -1735,6 +1676,7 @@ page_cur_insert_rec_zip( The page reorganization or creation that we would attempt outside crash recovery would have been covered by a previous redo log record. */ +#endif } else if (page_is_empty(page)) { ut_ad(page_cur_is_before_first(cursor)); @@ -1813,12 +1755,14 @@ page_cur_insert_rec_zip( be logged after a successful operation. */ ut_ad(!recv_recovery_is_on()); ut_ad(!index->is_dummy); +#if 1 /* MDEV-12353 FIXME: skip this for the physical log format! */ } else if (recv_recovery_is_on()) { /* This should be followed by MLOG_ZIP_PAGE_COMPRESS_NO_DATA, which should succeed. */ rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets); +#endif } else { ulint pos = page_rec_get_n_recs_before(insert_rec); ut_ad(pos > 0); @@ -1841,7 +1785,7 @@ page_cur_insert_rec_zip( rec_offs_make_valid( insert_rec, index, page_is_leaf(page), offsets); - return(insert_rec); + return insert_rec; } /* Theoretically, we could try one last resort @@ -1908,9 +1852,16 @@ too_small: } heap_no = rec_get_heap_no_new(free_rec); - page_mem_alloc_free(page, page_zip, - rec_get_next_ptr(free_rec, TRUE), - rec_size); + const rec_t* next = rec_get_next_ptr_const(free_rec, true); + mach_write_to_2(PAGE_FREE + PAGE_HEADER + page, + next ? page_offset(next) : 0); + byte* garbage = PAGE_GARBAGE + PAGE_HEADER + page; + ut_ad(mach_read_from_2(garbage) >= rec_size); + mach_write_to_2(garbage, mach_read_from_2(garbage) - rec_size); + compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2); + page_zip_write_header(page_zip, + PAGE_HEADER + PAGE_FREE + page, 4, mtr); + /* TODO: group with PAGE_LAST_INSERT */ if (!page_is_leaf(page)) { /* Zero out the node pointer of free_rec, @@ -1961,14 +1912,14 @@ too_small: } else { use_heap: free_rec = NULL; - insert_buf = page_mem_alloc_heap(page, page_zip, - rec_size, &heap_no); + insert_buf = page_mem_alloc_heap<true>(cursor->block, rec_size, + &heap_no, mtr); if (UNIV_UNLIKELY(insert_buf == NULL)) { return(NULL); } - page_zip_dir_add_slot(page_zip, dict_index_is_clust(index)); + page_zip_dir_add_slot(cursor->block, index, mtr); } /* 3. Create the record */ @@ -1978,21 +1929,19 @@ use_heap: /* 4. Insert the record in the linked list of records */ ut_ad(cursor->rec != insert_rec); - { - /* next record after current before the insertion */ - const rec_t* next_rec = page_rec_get_next_low( - cursor->rec, TRUE); - ut_ad(rec_get_status(cursor->rec) - <= REC_STATUS_INFIMUM); - ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM); - ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM); - - page_rec_set_next(insert_rec, next_rec); - page_rec_set_next(cursor->rec, insert_rec); - } + /* next record after current before the insertion */ + const rec_t* next_rec = page_rec_get_next_low(cursor->rec, TRUE); + ut_ad(rec_get_status(cursor->rec) <= REC_STATUS_INFIMUM); + ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM); + ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM); - page_header_set_field(page, page_zip, PAGE_N_RECS, - 1U + page_get_n_recs(page)); + mach_write_to_2(insert_rec - REC_NEXT, static_cast<uint16_t> + (next_rec - insert_rec)); + mach_write_to_2(cursor->rec - REC_NEXT, static_cast<uint16_t> + (insert_rec - cursor->rec)); + byte* n_recs = PAGE_N_RECS + PAGE_HEADER + page; + mach_write_to_2(n_recs, mach_read_from_2(n_recs) + 1); + page_zip_write_header(page_zip, n_recs, 2, mtr); /* 5. Set the n_owned field in the inserted record to zero, and set the heap_no field */ @@ -2004,35 +1953,37 @@ use_heap: UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets), rec_offs_size(offsets)); - page_zip_dir_insert(cursor, free_rec, insert_rec); + page_zip_dir_insert(cursor, free_rec, insert_rec, mtr); /* 6. Update the last insertion info in page header */ - - last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT); - ut_ad(!last_insert - || rec_get_node_ptr_flag(last_insert) + byte* last_insert = PAGE_LAST_INSERT + PAGE_HEADER + page; + const uint16_t last_insert_rec = mach_read_from_2(last_insert); + ut_ad(!last_insert_rec + || rec_get_node_ptr_flag(page + last_insert_rec) == rec_get_node_ptr_flag(insert_rec)); + /* TODO: combine with PAGE_DIRECTION changes */ + mach_write_to_2(last_insert, page_offset(insert_rec)); + page_zip_write_header(page_zip, last_insert, 2, mtr); - if (!dict_index_is_spatial(index)) { + if (!index->is_spatial()) { byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page; - if (UNIV_UNLIKELY(last_insert == NULL)) { + if (UNIV_UNLIKELY(!last_insert_rec)) { no_direction: - page_direction_reset(ptr, page, page_zip); - } else if (last_insert == cursor->rec + page_direction_reset<true>(cursor->block, ptr, mtr); + } else if (page + last_insert_rec == cursor->rec && page_ptr_get_direction(ptr) != PAGE_LEFT) { - page_direction_increment(ptr, page, page_zip, - PAGE_RIGHT); + page_direction_increment<true>(cursor->block, ptr, + PAGE_RIGHT, mtr); } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT - && page_rec_get_next(insert_rec) == last_insert) { - page_direction_increment(ptr, page, page_zip, - PAGE_LEFT); + && page_rec_get_next(insert_rec) + == page + last_insert_rec) { + page_direction_increment<true>(cursor->block, ptr, + PAGE_LEFT, mtr); } else { goto no_direction; } } - page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec); - /* 7. It remains to update the owner record. */ { rec_t* owner_rec = page_rec_find_owner_rec(insert_rec); @@ -2047,22 +1998,14 @@ no_direction: we have to split the corresponding directory slot in two. */ if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) { - const mtr_log_t log_mode = mtr->set_log_mode( - MTR_LOG_NONE); page_dir_split_slot<true>( page_cur_get_block(cursor), page_dir_find_owner_slot(owner_rec), mtr); - mtr->set_log_mode(log_mode); } } - page_zip_write_rec(page_zip, insert_rec, index, offsets, 1); - - /* 9. Write log record of the insert */ - page_cur_insert_rec_write_log(insert_rec, rec_size, - cursor->rec, index, mtr); - - return(insert_rec); + page_zip_write_rec(page_zip, insert_rec, index, offsets, 1, mtr); + return insert_rec; } /**********************************************************//** @@ -2079,8 +2022,6 @@ page_parse_copy_rec_list_to_created_page( mtr_t* mtr) /*!< in: mtr or NULL */ { ulint log_data_len; - page_t* page; - page_zip_des_t* page_zip; ut_ad(index->is_dummy); @@ -2119,14 +2060,16 @@ page_parse_copy_rec_list_to_created_page( ut_a(ptr == rec_end); - page = buf_block_get_frame(block); - page_zip = buf_block_get_page_zip(block); - - page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL); + memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT + block->frame, 0, 2); + if (block->page.zip.data) { + memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT + + block->page.zip.data, 0, 2); + } - if (!dict_index_is_spatial(index)) { - page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + page, - page, page_zip); + if (!index->is_spatial()) { + page_direction_reset<true>(block, + PAGE_HEADER + PAGE_DIRECTION_B + + block->frame, mtr); } return(rec_end); @@ -2246,8 +2189,6 @@ page_copy_rec_list_end_to_created_page( heap_top += rec_size; rec_offs_make_valid(insert_rec, index, is_leaf, offsets); - page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec, - index, mtr); prev_rec = insert_rec; rec = page_rec_get_next(rec); } while (!page_rec_is_supremum(rec)); @@ -2293,14 +2234,18 @@ page_copy_rec_list_end_to_created_page( mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page, 2 + slot_index); - page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top); - page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs); - page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs); - - *reinterpret_cast<uint16_t*>(PAGE_HEADER + PAGE_LAST_INSERT + new_page) - = 0; - page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + new_page, - new_page, NULL); + mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + new_page, + page_offset(heap_top)); + mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + new_page, + PAGE_HEAP_NO_USER_LOW + n_recs); + mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + new_page, n_recs); + + memset_aligned<2>(PAGE_HEADER + PAGE_LAST_INSERT + new_page, 0, 2); + mach_write_to_1(PAGE_HEADER + PAGE_DIRECTION_B + new_page, + (mach_read_from_1(PAGE_HEADER + PAGE_DIRECTION_B + + new_page) & ~((1U << 3) - 1)) + | PAGE_NO_DIRECTION); + memset_aligned<2>(PAGE_HEADER + PAGE_N_DIRECTION + new_page, 0, 2); } /***********************************************************//** |