diff options
Diffstat (limited to 'storage/innobase/btr/btr0bulk.cc')
-rw-r--r-- | storage/innobase/btr/btr0bulk.cc | 218 |
1 files changed, 154 insertions, 64 deletions
diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc index cf1fb62bce0..d892c429a1e 100644 --- a/storage/innobase/btr/btr0bulk.cc +++ b/storage/innobase/btr/btr0bulk.cc @@ -82,26 +82,21 @@ PageBulk::init() new_page = buf_block_get_frame(new_block); new_page_no = page_get_page_no(new_page); - byte* index_id = PAGE_HEADER + PAGE_INDEX_ID + new_page; + byte* index_id = my_assume_aligned<2> + (PAGE_HEADER + PAGE_INDEX_ID + new_page); + compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4); + compile_time_assert(FIL_NULL == 0xffffffff); + memset_aligned<8>(new_page + FIL_PAGE_PREV, 0xff, 8); if (UNIV_LIKELY_NULL(new_block->page.zip.data)) { + mach_write_to_8(index_id, m_index->id); page_create_zip(new_block, m_index, m_level, 0, &m_mtr); - static_assert(FIL_PAGE_PREV % 8 == 0, "alignment"); - memset_aligned<8>(FIL_PAGE_PREV + new_page, 0xff, 8); - page_zip_write_header(new_block, - FIL_PAGE_PREV + new_page, - 8, &m_mtr); - mach_write_to_8(index_id, m_index->id); - page_zip_write_header(new_block, index_id, 8, &m_mtr); } else { ut_ad(!m_index->is_spatial()); page_create(new_block, &m_mtr, m_index->table->not_redundant()); - compile_time_assert(FIL_PAGE_NEXT - == FIL_PAGE_PREV + 4); - compile_time_assert(FIL_NULL == 0xffffffff); - m_mtr.memset(new_block, FIL_PAGE_PREV, 8, 0xff); + m_mtr.memset(*new_block, FIL_PAGE_PREV, 8, 0xff); m_mtr.write<2,mtr_t::OPT>(*new_block, PAGE_HEADER + PAGE_LEVEL + new_page, m_level); @@ -155,22 +150,25 @@ PageBulk::init() /** Insert a record in the page. @tparam fmt the page format -@param[in] rec record +@param[in,out] rec record @param[in] offsets record offsets */ template<PageBulk::format fmt> -inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets) +inline void PageBulk::insertPage(rec_t *rec, offset_t *offsets) { ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED)); ut_ad((fmt != REDUNDANT) == m_is_comp); - + ut_ad(page_align(m_heap_top) == m_page); ut_ad(m_heap); - ulint rec_size= rec_offs_size(offsets); + const ulint rec_size= rec_offs_size(offsets); + const ulint extra_size= rec_offs_extra_size(offsets); + ut_ad(page_align(m_heap_top + rec_size) == m_page); ut_d(const bool is_leaf= page_rec_is_leaf(m_cur_rec)); #ifdef UNIV_DEBUG /* Check whether records are in order. */ - if (!page_rec_is_infimum_low(page_offset(m_cur_rec))) + if (page_offset(m_cur_rec) != + (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM)) { const rec_t *old_rec = m_cur_rec; offset_t *old_offsets= rec_get_offsets(old_rec, m_index, nullptr, is_leaf, @@ -181,41 +179,126 @@ inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets) m_total_data+= rec_size; #endif /* UNIV_DEBUG */ - /* Copy the record payload. */ - rec_t *insert_rec= rec_copy(m_heap_top, rec, offsets); - ut_ad(page_align(insert_rec) == m_page); - rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets); + rec_t* const insert_rec= m_heap_top + extra_size; /* Insert the record in the linked list. */ if (fmt != REDUNDANT) { - rec_t *next_rec= m_page + + const rec_t *next_rec= m_page + page_offset(m_cur_rec + mach_read_from_2(m_cur_rec - REC_NEXT)); - mach_write_to_2(insert_rec - REC_NEXT, - static_cast<uint16_t>(next_rec - insert_rec)); if (fmt != COMPRESSED) m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, static_cast<uint16_t>(insert_rec - m_cur_rec)); else + { mach_write_to_2(m_cur_rec - REC_NEXT, static_cast<uint16_t>(insert_rec - m_cur_rec)); - rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED, REC_N_OWNED_MASK, + memcpy(m_heap_top, rec - extra_size, rec_size); + } + + rec_t * const this_rec= fmt != COMPRESSED + ? const_cast<rec_t*>(rec) : insert_rec; + rec_set_bit_field_1(this_rec, 0, REC_NEW_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); - rec_set_bit_field_2(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no, + rec_set_bit_field_2(this_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no, REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); + mach_write_to_2(this_rec - REC_NEXT, + static_cast<uint16_t>(next_rec - insert_rec)); } else { - memcpy(insert_rec - REC_NEXT, m_cur_rec - REC_NEXT, 2); + memcpy(const_cast<rec_t*>(rec) - REC_NEXT, m_cur_rec - REC_NEXT, 2); m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, page_offset(insert_rec)); - rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED, REC_N_OWNED_MASK, - REC_N_OWNED_SHIFT); - rec_set_bit_field_2(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no, + rec_set_bit_field_1(const_cast<rec_t*>(rec), 0, + REC_OLD_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT); + rec_set_bit_field_2(const_cast<rec_t*>(rec), + PAGE_HEAP_NO_USER_LOW + m_rec_no, REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT); } - if (fmt != COMPRESSED) - m_mtr.memcpy(*m_block, page_offset(m_heap_top), rec_offs_size(offsets)); + if (fmt == COMPRESSED) + /* We already wrote the record. Log is written in PageBulk::compress(). */; + else if (page_offset(m_cur_rec) == + (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM)) + m_mtr.memcpy(*m_block, m_heap_top, rec - extra_size, rec_size); + else + { + /* Try to copy common prefix from the preceding record. */ + const byte *r= rec - extra_size; + const byte * const insert_rec_end= m_heap_top + rec_size; + byte *b= m_heap_top; + + /* Skip any unchanged prefix of the record. */ + for (; * b == *r; b++, r++); + + ut_ad(b < insert_rec_end); + + const byte *c= m_cur_rec - (rec - r); + const byte * const c_end= std::min(m_cur_rec + rec_offs_data_size(offsets), + m_heap_top); + + /* Try to copy any bytes of the preceding record. */ + if (UNIV_LIKELY(c >= m_page && c < c_end)) + { + const byte *cm= c; + byte *bm= b; + const byte *rm= r; + for (; cm < c_end && *rm == *cm; cm++, bm++, rm++); + ut_ad(bm <= insert_rec_end); + size_t len= static_cast<size_t>(rm - r); + ut_ad(!memcmp(r, c, len)); + if (len > 2) + { + memcpy(b, c, len); + m_mtr.memmove(*m_block, page_offset(b), page_offset(c), len); + c= cm; + b= bm; + r= rm; + } + } + + if (c < m_cur_rec) + { + if (!rec_offs_data_size(offsets)) + { +no_data: + m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c); + goto rec_done; + } + /* Some header bytes differ. Compare the data separately. */ + const byte *cd= m_cur_rec; + byte *bd= insert_rec; + const byte *rd= rec; + /* Skip any unchanged prefix of the record. */ + for (; *bd == *rd; cd++, bd++, rd++) + if (bd == insert_rec_end) + goto no_data; + + /* Try to copy any data bytes of the preceding record. */ + const byte *cdm= cd; + const byte *rdm= rd; + for (; cdm < c_end && *rdm == *cdm; cdm++, rdm++) + ut_ad(rdm - rd + bd <= insert_rec_end); + size_t len= static_cast<size_t>(rdm - rd); + ut_ad(!memcmp(rd, cd, len)); + if (len > 2) + { + m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c); + memcpy(bd, cd, len); + m_mtr.memmove(*m_block, page_offset(bd), page_offset(cd), len); + c= cdm; + b= rdm - rd + bd; + r= rdm; + } + } + + if (size_t len= static_cast<size_t>(insert_rec_end - b)) + m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, len); + } + +rec_done: + ut_ad(fmt == COMPRESSED || !memcmp(m_heap_top, rec - extra_size, rec_size)); + rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets); /* Update the member variables. */ ulint slot_size= page_dir_calc_reserved_space(m_rec_no + 1) - @@ -235,12 +318,25 @@ inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets) @param[in] offsets record offsets */ inline void PageBulk::insert(const rec_t *rec, offset_t *offsets) { + byte rec_hdr[REC_N_OLD_EXTRA_BYTES]; + static_assert(REC_N_OLD_EXTRA_BYTES > REC_N_NEW_EXTRA_BYTES, "file format"); + if (UNIV_LIKELY_NULL(m_page_zip)) - insertPage<COMPRESSED>(rec, offsets); + insertPage<COMPRESSED>(const_cast<rec_t*>(rec), offsets); else if (m_is_comp) - insertPage<DYNAMIC>(rec, offsets); + { + memcpy(rec_hdr, rec - REC_N_NEW_EXTRA_BYTES, REC_N_NEW_EXTRA_BYTES); + insertPage<DYNAMIC>(const_cast<rec_t*>(rec), offsets); + memcpy(const_cast<rec_t*>(rec) - REC_N_NEW_EXTRA_BYTES, rec_hdr, + REC_N_NEW_EXTRA_BYTES); + } else - insertPage<REDUNDANT>(rec, offsets); + { + memcpy(rec_hdr, rec - REC_N_OLD_EXTRA_BYTES, REC_N_OLD_EXTRA_BYTES); + insertPage<REDUNDANT>(const_cast<rec_t*>(rec), offsets); + memcpy(const_cast<rec_t*>(rec) - REC_N_OLD_EXTRA_BYTES, rec_hdr, + REC_N_OLD_EXTRA_BYTES); + } } /** Set the number of owned records in the uncompressed page of @@ -283,18 +379,13 @@ inline void PageBulk::finishPage() if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) { slot-= PAGE_DIR_SLOT_SIZE; + mach_write_to_2(slot, offset); if (fmt != COMPRESSED) - { - m_mtr.write<2,mtr_t::OPT>(*m_block, slot, offset); page_rec_set_n_owned<false>(m_block, m_page + offset, count, true, &m_mtr); - } else - { - mach_write_to_2(slot, offset); rec_set_n_owned_zip(m_page + offset, count); - } count= 0; } @@ -321,17 +412,12 @@ inline void PageBulk::finishPage() else slot-= PAGE_DIR_SLOT_SIZE; + mach_write_to_2(slot, PAGE_NEW_SUPREMUM); if (fmt != COMPRESSED) - { - m_mtr.write<2,mtr_t::OPT>(*m_block, slot, PAGE_NEW_SUPREMUM); page_rec_set_n_owned<false>(m_block, m_page + PAGE_NEW_SUPREMUM, count + 1, true, &m_mtr); - } else - { - mach_write_to_2(slot, PAGE_NEW_SUPREMUM); rec_set_n_owned_zip(m_page + PAGE_NEW_SUPREMUM, count + 1); - } } else { @@ -347,7 +433,7 @@ inline void PageBulk::finishPage() if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) { slot-= PAGE_DIR_SLOT_SIZE; - m_mtr.write<2,mtr_t::OPT>(*m_block, slot, page_offset(insert_rec)); + mach_write_to_2(slot, page_offset(insert_rec)); page_rec_set_n_owned<false>(m_block, insert_rec, count, false, &m_mtr); count= 0; } @@ -368,31 +454,35 @@ inline void PageBulk::finishPage() else slot-= PAGE_DIR_SLOT_SIZE; - m_mtr.write<2,mtr_t::OPT>(*m_block, slot, PAGE_OLD_SUPREMUM); + mach_write_to_2(slot, PAGE_OLD_SUPREMUM); page_rec_set_n_owned<false>(m_block, m_page + PAGE_OLD_SUPREMUM, count + 1, false, &m_mtr); } - ut_ad(!dict_index_is_spatial(m_index)); + ut_ad(!m_index->is_spatial()); ut_ad(!page_get_instant(m_page)); ut_ad(!mach_read_from_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page)); if (fmt != COMPRESSED) { - m_mtr.write<2,mtr_t::OPT>(*m_block, - PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page, - 1 + static_cast<ulint>(slot0 - slot) / - PAGE_DIR_SLOT_SIZE); - m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_HEAP_TOP + m_page, - static_cast<ulint>(m_heap_top - m_page)); - m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_N_HEAP + m_page, - (PAGE_HEAP_NO_USER_LOW + m_rec_no) | - uint16_t{fmt != REDUNDANT} << 15); - m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no); - m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_LAST_INSERT + m_page, - static_cast<ulint>(m_cur_rec - m_page)); - m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page, - PAGE_RIGHT); + static_assert(PAGE_N_DIR_SLOTS == 0, "compatibility"); + alignas(8) byte page_header[PAGE_N_RECS + 2]; + mach_write_to_2(page_header + PAGE_N_DIR_SLOTS, + 1 + (slot0 - slot) / PAGE_DIR_SLOT_SIZE); + mach_write_to_2(page_header + PAGE_HEAP_TOP, m_heap_top - m_page); + mach_write_to_2(page_header + PAGE_N_HEAP, + (PAGE_HEAP_NO_USER_LOW + m_rec_no) | + uint16_t{fmt != REDUNDANT} << 15); + memset_aligned<2>(page_header + PAGE_FREE, 0, 4); + static_assert(PAGE_GARBAGE == PAGE_FREE + 2, "compatibility"); + mach_write_to_2(page_header + PAGE_LAST_INSERT, m_cur_rec - m_page); + mach_write_to_2(page_header + PAGE_DIRECTION_B - 1, PAGE_RIGHT); + mach_write_to_2(page_header + PAGE_N_DIRECTION, m_rec_no); + memcpy_aligned<2>(page_header + PAGE_N_RECS, + page_header + PAGE_N_DIRECTION, 2); + m_mtr.memcpy(*m_block, PAGE_HEADER + m_page, page_header, + sizeof page_header); + m_mtr.memcpy(*m_block, page_offset(slot), slot0 - slot); } else { |