summaryrefslogtreecommitdiff
path: root/storage/innobase/btr/btr0bulk.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/btr/btr0bulk.cc')
-rw-r--r--storage/innobase/btr/btr0bulk.cc218
1 files changed, 154 insertions, 64 deletions
diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc
index cf1fb62bce0..d892c429a1e 100644
--- a/storage/innobase/btr/btr0bulk.cc
+++ b/storage/innobase/btr/btr0bulk.cc
@@ -82,26 +82,21 @@ PageBulk::init()
new_page = buf_block_get_frame(new_block);
new_page_no = page_get_page_no(new_page);
- byte* index_id = PAGE_HEADER + PAGE_INDEX_ID + new_page;
+ byte* index_id = my_assume_aligned<2>
+ (PAGE_HEADER + PAGE_INDEX_ID + new_page);
+ compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4);
+ compile_time_assert(FIL_NULL == 0xffffffff);
+ memset_aligned<8>(new_page + FIL_PAGE_PREV, 0xff, 8);
if (UNIV_LIKELY_NULL(new_block->page.zip.data)) {
+ mach_write_to_8(index_id, m_index->id);
page_create_zip(new_block, m_index, m_level, 0,
&m_mtr);
- static_assert(FIL_PAGE_PREV % 8 == 0, "alignment");
- memset_aligned<8>(FIL_PAGE_PREV + new_page, 0xff, 8);
- page_zip_write_header(new_block,
- FIL_PAGE_PREV + new_page,
- 8, &m_mtr);
- mach_write_to_8(index_id, m_index->id);
- page_zip_write_header(new_block, index_id, 8, &m_mtr);
} else {
ut_ad(!m_index->is_spatial());
page_create(new_block, &m_mtr,
m_index->table->not_redundant());
- compile_time_assert(FIL_PAGE_NEXT
- == FIL_PAGE_PREV + 4);
- compile_time_assert(FIL_NULL == 0xffffffff);
- m_mtr.memset(new_block, FIL_PAGE_PREV, 8, 0xff);
+ m_mtr.memset(*new_block, FIL_PAGE_PREV, 8, 0xff);
m_mtr.write<2,mtr_t::OPT>(*new_block,
PAGE_HEADER + PAGE_LEVEL
+ new_page, m_level);
@@ -155,22 +150,25 @@ PageBulk::init()
/** Insert a record in the page.
@tparam fmt the page format
-@param[in] rec record
+@param[in,out] rec record
@param[in] offsets record offsets */
template<PageBulk::format fmt>
-inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets)
+inline void PageBulk::insertPage(rec_t *rec, offset_t *offsets)
{
ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED));
ut_ad((fmt != REDUNDANT) == m_is_comp);
-
+ ut_ad(page_align(m_heap_top) == m_page);
ut_ad(m_heap);
- ulint rec_size= rec_offs_size(offsets);
+ const ulint rec_size= rec_offs_size(offsets);
+ const ulint extra_size= rec_offs_extra_size(offsets);
+ ut_ad(page_align(m_heap_top + rec_size) == m_page);
ut_d(const bool is_leaf= page_rec_is_leaf(m_cur_rec));
#ifdef UNIV_DEBUG
/* Check whether records are in order. */
- if (!page_rec_is_infimum_low(page_offset(m_cur_rec)))
+ if (page_offset(m_cur_rec) !=
+ (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM))
{
const rec_t *old_rec = m_cur_rec;
offset_t *old_offsets= rec_get_offsets(old_rec, m_index, nullptr, is_leaf,
@@ -181,41 +179,126 @@ inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets)
m_total_data+= rec_size;
#endif /* UNIV_DEBUG */
- /* Copy the record payload. */
- rec_t *insert_rec= rec_copy(m_heap_top, rec, offsets);
- ut_ad(page_align(insert_rec) == m_page);
- rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets);
+ rec_t* const insert_rec= m_heap_top + extra_size;
/* Insert the record in the linked list. */
if (fmt != REDUNDANT)
{
- rec_t *next_rec= m_page +
+ const rec_t *next_rec= m_page +
page_offset(m_cur_rec + mach_read_from_2(m_cur_rec - REC_NEXT));
- mach_write_to_2(insert_rec - REC_NEXT,
- static_cast<uint16_t>(next_rec - insert_rec));
if (fmt != COMPRESSED)
m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT,
static_cast<uint16_t>(insert_rec - m_cur_rec));
else
+ {
mach_write_to_2(m_cur_rec - REC_NEXT,
static_cast<uint16_t>(insert_rec - m_cur_rec));
- rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED, REC_N_OWNED_MASK,
+ memcpy(m_heap_top, rec - extra_size, rec_size);
+ }
+
+ rec_t * const this_rec= fmt != COMPRESSED
+ ? const_cast<rec_t*>(rec) : insert_rec;
+ rec_set_bit_field_1(this_rec, 0, REC_NEW_N_OWNED, REC_N_OWNED_MASK,
REC_N_OWNED_SHIFT);
- rec_set_bit_field_2(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no,
+ rec_set_bit_field_2(this_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no,
REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
+ mach_write_to_2(this_rec - REC_NEXT,
+ static_cast<uint16_t>(next_rec - insert_rec));
}
else
{
- memcpy(insert_rec - REC_NEXT, m_cur_rec - REC_NEXT, 2);
+ memcpy(const_cast<rec_t*>(rec) - REC_NEXT, m_cur_rec - REC_NEXT, 2);
m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, page_offset(insert_rec));
- rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED, REC_N_OWNED_MASK,
- REC_N_OWNED_SHIFT);
- rec_set_bit_field_2(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no,
+ rec_set_bit_field_1(const_cast<rec_t*>(rec), 0,
+ REC_OLD_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ rec_set_bit_field_2(const_cast<rec_t*>(rec),
+ PAGE_HEAP_NO_USER_LOW + m_rec_no,
REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
}
- if (fmt != COMPRESSED)
- m_mtr.memcpy(*m_block, page_offset(m_heap_top), rec_offs_size(offsets));
+ if (fmt == COMPRESSED)
+ /* We already wrote the record. Log is written in PageBulk::compress(). */;
+ else if (page_offset(m_cur_rec) ==
+ (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM))
+ m_mtr.memcpy(*m_block, m_heap_top, rec - extra_size, rec_size);
+ else
+ {
+ /* Try to copy common prefix from the preceding record. */
+ const byte *r= rec - extra_size;
+ const byte * const insert_rec_end= m_heap_top + rec_size;
+ byte *b= m_heap_top;
+
+ /* Skip any unchanged prefix of the record. */
+ for (; * b == *r; b++, r++);
+
+ ut_ad(b < insert_rec_end);
+
+ const byte *c= m_cur_rec - (rec - r);
+ const byte * const c_end= std::min(m_cur_rec + rec_offs_data_size(offsets),
+ m_heap_top);
+
+ /* Try to copy any bytes of the preceding record. */
+ if (UNIV_LIKELY(c >= m_page && c < c_end))
+ {
+ const byte *cm= c;
+ byte *bm= b;
+ const byte *rm= r;
+ for (; cm < c_end && *rm == *cm; cm++, bm++, rm++);
+ ut_ad(bm <= insert_rec_end);
+ size_t len= static_cast<size_t>(rm - r);
+ ut_ad(!memcmp(r, c, len));
+ if (len > 2)
+ {
+ memcpy(b, c, len);
+ m_mtr.memmove(*m_block, page_offset(b), page_offset(c), len);
+ c= cm;
+ b= bm;
+ r= rm;
+ }
+ }
+
+ if (c < m_cur_rec)
+ {
+ if (!rec_offs_data_size(offsets))
+ {
+no_data:
+ m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c);
+ goto rec_done;
+ }
+ /* Some header bytes differ. Compare the data separately. */
+ const byte *cd= m_cur_rec;
+ byte *bd= insert_rec;
+ const byte *rd= rec;
+ /* Skip any unchanged prefix of the record. */
+ for (; *bd == *rd; cd++, bd++, rd++)
+ if (bd == insert_rec_end)
+ goto no_data;
+
+ /* Try to copy any data bytes of the preceding record. */
+ const byte *cdm= cd;
+ const byte *rdm= rd;
+ for (; cdm < c_end && *rdm == *cdm; cdm++, rdm++)
+ ut_ad(rdm - rd + bd <= insert_rec_end);
+ size_t len= static_cast<size_t>(rdm - rd);
+ ut_ad(!memcmp(rd, cd, len));
+ if (len > 2)
+ {
+ m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c);
+ memcpy(bd, cd, len);
+ m_mtr.memmove(*m_block, page_offset(bd), page_offset(cd), len);
+ c= cdm;
+ b= rdm - rd + bd;
+ r= rdm;
+ }
+ }
+
+ if (size_t len= static_cast<size_t>(insert_rec_end - b))
+ m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, len);
+ }
+
+rec_done:
+ ut_ad(fmt == COMPRESSED || !memcmp(m_heap_top, rec - extra_size, rec_size));
+ rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets);
/* Update the member variables. */
ulint slot_size= page_dir_calc_reserved_space(m_rec_no + 1) -
@@ -235,12 +318,25 @@ inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets)
@param[in] offsets record offsets */
inline void PageBulk::insert(const rec_t *rec, offset_t *offsets)
{
+ byte rec_hdr[REC_N_OLD_EXTRA_BYTES];
+ static_assert(REC_N_OLD_EXTRA_BYTES > REC_N_NEW_EXTRA_BYTES, "file format");
+
if (UNIV_LIKELY_NULL(m_page_zip))
- insertPage<COMPRESSED>(rec, offsets);
+ insertPage<COMPRESSED>(const_cast<rec_t*>(rec), offsets);
else if (m_is_comp)
- insertPage<DYNAMIC>(rec, offsets);
+ {
+ memcpy(rec_hdr, rec - REC_N_NEW_EXTRA_BYTES, REC_N_NEW_EXTRA_BYTES);
+ insertPage<DYNAMIC>(const_cast<rec_t*>(rec), offsets);
+ memcpy(const_cast<rec_t*>(rec) - REC_N_NEW_EXTRA_BYTES, rec_hdr,
+ REC_N_NEW_EXTRA_BYTES);
+ }
else
- insertPage<REDUNDANT>(rec, offsets);
+ {
+ memcpy(rec_hdr, rec - REC_N_OLD_EXTRA_BYTES, REC_N_OLD_EXTRA_BYTES);
+ insertPage<REDUNDANT>(const_cast<rec_t*>(rec), offsets);
+ memcpy(const_cast<rec_t*>(rec) - REC_N_OLD_EXTRA_BYTES, rec_hdr,
+ REC_N_OLD_EXTRA_BYTES);
+ }
}
/** Set the number of owned records in the uncompressed page of
@@ -283,18 +379,13 @@ inline void PageBulk::finishPage()
if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)
{
slot-= PAGE_DIR_SLOT_SIZE;
+ mach_write_to_2(slot, offset);
if (fmt != COMPRESSED)
- {
- m_mtr.write<2,mtr_t::OPT>(*m_block, slot, offset);
page_rec_set_n_owned<false>(m_block, m_page + offset, count, true,
&m_mtr);
- }
else
- {
- mach_write_to_2(slot, offset);
rec_set_n_owned_zip(m_page + offset, count);
- }
count= 0;
}
@@ -321,17 +412,12 @@ inline void PageBulk::finishPage()
else
slot-= PAGE_DIR_SLOT_SIZE;
+ mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
if (fmt != COMPRESSED)
- {
- m_mtr.write<2,mtr_t::OPT>(*m_block, slot, PAGE_NEW_SUPREMUM);
page_rec_set_n_owned<false>(m_block, m_page + PAGE_NEW_SUPREMUM,
count + 1, true, &m_mtr);
- }
else
- {
- mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
rec_set_n_owned_zip(m_page + PAGE_NEW_SUPREMUM, count + 1);
- }
}
else
{
@@ -347,7 +433,7 @@ inline void PageBulk::finishPage()
if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)
{
slot-= PAGE_DIR_SLOT_SIZE;
- m_mtr.write<2,mtr_t::OPT>(*m_block, slot, page_offset(insert_rec));
+ mach_write_to_2(slot, page_offset(insert_rec));
page_rec_set_n_owned<false>(m_block, insert_rec, count, false, &m_mtr);
count= 0;
}
@@ -368,31 +454,35 @@ inline void PageBulk::finishPage()
else
slot-= PAGE_DIR_SLOT_SIZE;
- m_mtr.write<2,mtr_t::OPT>(*m_block, slot, PAGE_OLD_SUPREMUM);
+ mach_write_to_2(slot, PAGE_OLD_SUPREMUM);
page_rec_set_n_owned<false>(m_block, m_page + PAGE_OLD_SUPREMUM, count + 1,
false, &m_mtr);
}
- ut_ad(!dict_index_is_spatial(m_index));
+ ut_ad(!m_index->is_spatial());
ut_ad(!page_get_instant(m_page));
ut_ad(!mach_read_from_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page));
if (fmt != COMPRESSED)
{
- m_mtr.write<2,mtr_t::OPT>(*m_block,
- PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
- 1 + static_cast<ulint>(slot0 - slot) /
- PAGE_DIR_SLOT_SIZE);
- m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_HEAP_TOP + m_page,
- static_cast<ulint>(m_heap_top - m_page));
- m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_N_HEAP + m_page,
- (PAGE_HEAP_NO_USER_LOW + m_rec_no) |
- uint16_t{fmt != REDUNDANT} << 15);
- m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
- m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_LAST_INSERT + m_page,
- static_cast<ulint>(m_cur_rec - m_page));
- m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page,
- PAGE_RIGHT);
+ static_assert(PAGE_N_DIR_SLOTS == 0, "compatibility");
+ alignas(8) byte page_header[PAGE_N_RECS + 2];
+ mach_write_to_2(page_header + PAGE_N_DIR_SLOTS,
+ 1 + (slot0 - slot) / PAGE_DIR_SLOT_SIZE);
+ mach_write_to_2(page_header + PAGE_HEAP_TOP, m_heap_top - m_page);
+ mach_write_to_2(page_header + PAGE_N_HEAP,
+ (PAGE_HEAP_NO_USER_LOW + m_rec_no) |
+ uint16_t{fmt != REDUNDANT} << 15);
+ memset_aligned<2>(page_header + PAGE_FREE, 0, 4);
+ static_assert(PAGE_GARBAGE == PAGE_FREE + 2, "compatibility");
+ mach_write_to_2(page_header + PAGE_LAST_INSERT, m_cur_rec - m_page);
+ mach_write_to_2(page_header + PAGE_DIRECTION_B - 1, PAGE_RIGHT);
+ mach_write_to_2(page_header + PAGE_N_DIRECTION, m_rec_no);
+ memcpy_aligned<2>(page_header + PAGE_N_RECS,
+ page_header + PAGE_N_DIRECTION, 2);
+ m_mtr.memcpy(*m_block, PAGE_HEADER + m_page, page_header,
+ sizeof page_header);
+ m_mtr.memcpy(*m_block, page_offset(slot), slot0 - slot);
}
else
{