summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2020-02-27 17:19:44 +0200
committerMarko Mäkelä <marko.makela@mariadb.com>2020-02-27 17:19:44 +0200
commit138cbec5f2300bd5b401e83802642c1806264992 (patch)
tree873082789086ad6467861e92991dff545000808b
parentdee6fb356b2c38a8dad2a4f2d21504149f9a1917 (diff)
downloadmariadb-git-138cbec5f2300bd5b401e83802642c1806264992.tar.gz
MDEV-21724: Optimize page_cur_insert_low() redo logging
Inserting a record into an index page involves updating multiple fields in the page header as well as updating the next-record links and potentially updating fields related to the sparse page directory. Let us cover the insert operations by higher-level log records, to avoid 'redundant' logging about the writes. The code for applying the high-level log records will check the consistency of the page thoroughly, to avoid crashes during recovery. We will refuse to replay the inserts if any inconsistency is detected. With innodb_force_recovery=1, recovery will continue, but the affected pages may be more inconsistent if some changes were omitted. mrec_ext_t: Introduce the EXTENDED record subtypes INSERT_HEAP_REDUNDANT, INSERT_REUSE_REDUNDANT, INSERT_HEAP_DYNAMIC, INSERT_REUSE_DYNAMIC. The record will explicitly identify the page type and whether the space will be allocated from PAGE_HEAP_TOP or reused from the PAGE_FREE list. It will also tell how many bytes to copy from the preceding record header and payload, and how to initialize the rest of the record header and payload. mtr_t::page_insert(): Write the high-level log records. log_phys_t::apply(): Parse the high-level log records. page_apply_insert_redundant(), page_apply_insert_dynamic(): Apply the high-level log records. page_dir_split_slot(): Introduce a variant that does not write log nor deal with ROW_FORMAT=COMPRESSED pages. page_mem_alloc_heap(): Remove the mtr_t parameter page_cur_insert_rec_low(): Write log only via mtr_t::page_insert().
-rw-r--r--storage/innobase/include/mtr0mtr.h39
-rw-r--r--storage/innobase/include/mtr0types.h12
-rw-r--r--storage/innobase/include/page0cur.h33
-rw-r--r--storage/innobase/log/log0recv.cc93
-rw-r--r--storage/innobase/page/page0cur.cc1102
5 files changed, 1059 insertions, 220 deletions
diff --git a/storage/innobase/include/mtr0mtr.h b/storage/innobase/include/mtr0mtr.h
index 5a0b4fd77b9..1a270ebb975 100644
--- a/storage/innobase/include/mtr0mtr.h
+++ b/storage/innobase/include/mtr0mtr.h
@@ -494,6 +494,45 @@ struct mtr_t {
@param block B-tree or R-tree page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */
inline void page_create(const buf_block_t &block, bool comp);
+
+ /** Write log for inserting a B-tree or R-tree record in
+ ROW_FORMAT=REDUNDANT.
+ @param block B-tree or R-tree page
+ @param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
+ @param prev_rec byte offset of the predecessor of the record to insert,
+ starting from PAGE_OLD_INFIMUM
+ @param info_bits info_bits of the record
+ @param n_fields_s number of fields << 1 | rec_get_1byte_offs_flag()
+ @param hdr_c number of common record header bytes with prev_rec
+ @param data_c number of common data bytes with prev_rec
+ @param hdr record header bytes to copy to the log
+ @param hdr_l number of copied record header bytes
+ @param data record payload bytes to copy to the log
+ @param data_l number of copied record data bytes */
+ inline void page_insert(const buf_block_t &block, bool reuse,
+ ulint prev_rec, byte info_bits,
+ ulint n_fields_s, size_t hdr_c, size_t data_c,
+ const byte *hdr, size_t hdr_l,
+ const byte *data, size_t data_l);
+ /** Write log for inserting a B-tree or R-tree record in
+ ROW_FORMAT=COMPACT or ROW_FORMAT=DYNAMIC.
+ @param block B-tree or R-tree page
+ @param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
+ @param prev_rec byte offset of the predecessor of the record to insert,
+ starting from PAGE_NEW_INFIMUM
+ @param info_status rec_get_info_and_status_bits()
+ @param shift unless !reuse: number of bytes the PAGE_FREE is moving
+ @param hdr_c number of common record header bytes with prev_rec
+ @param data_c number of common data bytes with prev_rec
+ @param hdr record header bytes to copy to the log
+ @param hdr_l number of copied record header bytes
+ @param data record payload bytes to copy to the log
+ @param data_l number of copied record data bytes */
+ inline void page_insert(const buf_block_t &block, bool reuse,
+ ulint prev_rec, byte info_status,
+ ssize_t shift, size_t hdr_c, size_t data_c,
+ const byte *hdr, size_t hdr_l,
+ const byte *data, size_t data_l);
/** Write log for deleting a B-tree or R-tree record in ROW_FORMAT=REDUNDANT.
@param block B-tree or R-tree page
@param prev_rec byte offset of the predecessor of the record to delete,
diff --git a/storage/innobase/include/mtr0types.h b/storage/innobase/include/mtr0types.h
index bfa30cf30b1..bae6d247d59 100644
--- a/storage/innobase/include/mtr0types.h
+++ b/storage/innobase/include/mtr0types.h
@@ -263,6 +263,18 @@ enum mrec_ext_t
This is equivalent to the old MLOG_UNDO_INSERT record.
The current byte offset will be reset to FIL_PAGE_TYPE. */
UNDO_APPEND= 3,
+ /** Insert a ROW_FORMAT=REDUNDANT record, extending PAGE_HEAP_TOP.
+ The current byte offset will be reset to FIL_PAGE_TYPE. */
+ INSERT_HEAP_REDUNDANT= 4,
+ /** Insert a ROW_FORMAT=REDUNDANT record, reusing PAGE_FREE.
+ The current byte offset will be reset to FIL_PAGE_TYPE. */
+ INSERT_REUSE_REDUNDANT= 5,
+ /** Insert a ROW_FORMAT=COMPACT or DYNAMIC record, extending PAGE_HEAP_TOP.
+ The current byte offset will be reset to FIL_PAGE_TYPE. */
+ INSERT_HEAP_DYNAMIC= 6,
+ /** Insert a ROW_FORMAT=COMPACT or DYNAMIC record, reusing PAGE_FREE.
+ The current byte offset will be reset to FIL_PAGE_TYPE. */
+ INSERT_REUSE_DYNAMIC= 7,
/** Delete a record on a ROW_FORMAT=REDUNDANT page.
We point to the precedessor of the record to be deleted.
The current byte offset will be reset to FIL_PAGE_TYPE.
diff --git a/storage/innobase/include/page0cur.h b/storage/innobase/include/page0cur.h
index 4ed76d5d183..2e57b792349 100644
--- a/storage/innobase/include/page0cur.h
+++ b/storage/innobase/include/page0cur.h
@@ -201,6 +201,39 @@ page_cur_delete_rec(
mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull));
+/** Apply a INSERT_HEAP_REDUNDANT or INSERT_REUSE_REDUNDANT record that was
+written by page_cur_insert_rec_low() for a ROW_FORMAT=REDUNDANT page.
+@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
+@param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
+@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
+@param enc_hdr encoded fixed-size header bits
+@param hdr_c number of common record header bytes with prev
+@param data_c number of common data bytes with prev
+@param data literal header and data bytes
+@param data_len length of the literal data, in bytes
+@return whether the operation failed (inconcistency was noticed) */
+bool page_apply_insert_redundant(const buf_block_t &block, bool reuse,
+ ulint prev, ulint enc_hdr,
+ size_t hdr_c, size_t data_c,
+ const void *data, size_t data_len);
+
+/** Apply a INSERT_HEAP_DYNAMIC or INSERT_REUSE_DYNAMIC record that was
+written by page_cur_insert_rec_low() for a ROW_FORMAT=COMPACT or DYNAMIC page.
+@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
+@param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
+@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
+@param shift unless !reuse: number of bytes the PAGE_FREE is moving
+@param enc_hdr_l number of copied record header bytes, plus record type bits
+@param hdr_c number of common record header bytes with prev
+@param data_c number of common data bytes with prev
+@param data literal header and data bytes
+@param data_len length of the literal data, in bytes
+@return whether the operation failed (inconcistency was noticed) */
+bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
+ ulint prev, ulint shift, ulint enc_hdr_l,
+ size_t hdr_c, size_t data_c,
+ const void *data, size_t data_len);
+
/** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by
page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page.
@param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index a9bd2c118be..f007ef81222 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -291,8 +291,9 @@ public:
static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility");
if (UNIV_UNLIKELY(!rlen))
goto record_corrupted;
- switch (*l) {
+ switch (const byte subtype= *l) {
uint8_t ll;
+ size_t prev_rec, hdr_size;
default:
goto record_corrupted;
case INIT_ROW_FORMAT_REDUNDANT:
@@ -317,6 +318,90 @@ page_corrupted:
return applied;
}
break;
+ case INSERT_HEAP_REDUNDANT:
+ case INSERT_REUSE_REDUNDANT:
+ case INSERT_HEAP_DYNAMIC:
+ case INSERT_REUSE_DYNAMIC:
+ if (UNIV_UNLIKELY(rlen < 2))
+ goto record_corrupted;
+ rlen--;
+ ll= mlog_decode_varint_length(*++l);
+ if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
+ goto record_corrupted;
+ prev_rec= mlog_decode_varint(l);
+ ut_ad(prev_rec != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ ll= mlog_decode_varint_length(*l);
+ static_assert(INSERT_HEAP_REDUNDANT == 4, "compatibility");
+ static_assert(INSERT_REUSE_REDUNDANT == 5, "compatibility");
+ static_assert(INSERT_HEAP_DYNAMIC == 6, "compatibility");
+ static_assert(INSERT_REUSE_DYNAMIC == 7, "compatibility");
+ if (subtype & 2)
+ {
+ size_t shift= 0;
+ if (subtype & 1)
+ {
+ if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
+ goto record_corrupted;
+ shift= mlog_decode_varint(l);
+ ut_ad(shift != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ ll= mlog_decode_varint_length(*l);
+ }
+ if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
+ goto record_corrupted;
+ size_t enc_hdr_l= mlog_decode_varint(l);
+ ut_ad(enc_hdr_l != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ ll= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
+ goto record_corrupted;
+ size_t hdr_c= mlog_decode_varint(l);
+ ut_ad(hdr_c != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ ll= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
+ goto record_corrupted;
+ size_t data_c= mlog_decode_varint(l);
+ ut_ad(data_c != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ if (page_apply_insert_dynamic(block, subtype & 1, prev_rec,
+ shift, enc_hdr_l, hdr_c, data_c,
+ l, rlen) && !srv_force_recovery)
+ goto page_corrupted;
+ }
+ else
+ {
+ if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
+ goto record_corrupted;
+ size_t header= mlog_decode_varint(l);
+ ut_ad(header != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ ll= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
+ goto record_corrupted;
+ size_t hdr_c= mlog_decode_varint(l);
+ ut_ad(hdr_c != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ ll= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
+ goto record_corrupted;
+ size_t data_c= mlog_decode_varint(l);
+ rlen-= ll;
+ l+= ll;
+ if (page_apply_insert_redundant(block, subtype & 1, prev_rec,
+ header, hdr_c, data_c,
+ l, rlen) && !srv_force_recovery)
+ goto page_corrupted;
+ }
+ break;
case DELETE_ROW_FORMAT_REDUNDANT:
if (UNIV_UNLIKELY(rlen < 2 || rlen > 4))
goto record_corrupted;
@@ -335,14 +420,14 @@ page_corrupted:
ll= mlog_decode_varint_length(*++l);
if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
goto record_corrupted;
- size_t prev_rec= mlog_decode_varint(l);
+ prev_rec= mlog_decode_varint(l);
ut_ad(prev_rec != MLOG_DECODE_ERROR);
rlen-= ll;
l+= ll;
ll= mlog_decode_varint_length(*l);
if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
goto record_corrupted;
- size_t hdr_size= mlog_decode_varint(l);
+ hdr_size= mlog_decode_varint(l);
ut_ad(hdr_size != MLOG_DECODE_ERROR);
rlen-= ll;
l+= ll;
@@ -350,7 +435,7 @@ page_corrupted:
if (UNIV_UNLIKELY(ll > 3 || ll != rlen))
goto record_corrupted;
if (page_apply_delete_dynamic(block, prev_rec, hdr_size,
- mlog_decode_varint(l)) &&
+ mlog_decode_varint(l)) &&
!srv_force_recovery)
goto page_corrupted;
break;
diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc
index dce4d5e2f5c..d04ec647750 100644
--- a/storage/innobase/page/page0cur.cc
+++ b/storage/innobase/page/page0cur.cc
@@ -798,23 +798,64 @@ static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp)
/**
Split a directory slot which owns too many records.
-@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well
@param[in,out] block index page
+@param[in] s the slot that needs to be split */
+static void page_dir_split_slot(const buf_block_t &block, ulint s)
+{
+ ut_ad(s);
+
+ page_dir_slot_t *slot= page_dir_get_nth_slot(block.frame, s);
+ const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
+
+ ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
+ static_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 >=
+ PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility");
+
+ /* Find a record approximately in the middle. */
+ const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
+
+ for (ulint i= n_owned / 2; i--; )
+ rec= page_rec_get_next_const(rec);
+
+ /* Add a directory slot immediately below this one. */
+ constexpr uint16_t n_slots_f= PAGE_N_DIR_SLOTS + PAGE_HEADER;
+ byte *n_slots_p= my_assume_aligned<2>(n_slots_f + block.frame);
+ const uint16_t n_slots= mach_read_from_2(n_slots_p);
+
+ page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*>
+ (block.frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) -
+ n_slots * PAGE_DIR_SLOT_SIZE);
+ memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
+ slot - last_slot);
+
+ const ulint half_owned= n_owned / 2;
+
+ mach_write_to_2(n_slots_p, n_slots + 1);
+
+ mach_write_to_2(slot, rec - block.frame);
+ const bool comp= page_is_comp(block.frame) != 0;
+ page_rec_set_n_owned(page_dir_slot_get_rec(slot), half_owned, comp);
+ page_rec_set_n_owned(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE),
+ n_owned - half_owned, comp);
+}
+
+/**
+Split a directory slot which owns too many records.
+@param[in,out] block index page (ROW_FORMAT=COMPRESSED)
@param[in] s the slot that needs to be split
@param[in,out] mtr mini-transaction */
-template<bool compressed>
-static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
+static void page_zip_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
{
- ut_ad(!block->page.zip.data || page_is_comp(block->frame));
- ut_ad(!compressed || block->page.zip.data);
+ ut_ad(block->page.zip.data);
+ ut_ad(page_is_comp(block->frame));
ut_ad(s);
page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, s);
const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
- compile_time_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
- >= PAGE_DIR_SLOT_MIN_N_OWNED);
+ static_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 >=
+ PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility");
/* 1. We loop to find a record approximately in the middle of the
records owned by the slot. */
@@ -839,33 +880,14 @@ static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
mtr->write<2>(*block, n_slots_p, 1U + n_slots);
- if (compressed)
- {
- /* Log changes to the compressed page header and the dense page
- directory. */
- memcpy_aligned<2>(&block->page.zip.data[n_slots_f], n_slots_p, 2);
- mach_write_to_2(slot, page_offset(rec));
- page_rec_set_n_owned<true>(block, page_dir_slot_get_rec(slot), half_owned,
- true, mtr);
- page_rec_set_n_owned<true>(block,
- page_dir_slot_get_rec(slot -
- PAGE_DIR_SLOT_SIZE),
- n_owned - half_owned, true, mtr);
- }
- else
- {
- mtr->memmove(*block, page_offset(last_slot),
- page_offset(last_slot) + PAGE_DIR_SLOT_SIZE,
- slot - last_slot);
- mtr->write<2>(*block, slot, page_offset(rec));
- const bool comp= page_is_comp(block->frame) != 0;
- page_rec_set_n_owned<false>(block, page_dir_slot_get_rec(slot), half_owned,
- comp, mtr);
- page_rec_set_n_owned<false>(block,
- page_dir_slot_get_rec(slot -
- PAGE_DIR_SLOT_SIZE),
- n_owned - half_owned, comp, mtr);
- }
+ /* Log changes to the compressed page header and the dense page directory. */
+ memcpy_aligned<2>(&block->page.zip.data[n_slots_f], n_slots_p, 2);
+ mach_write_to_2(slot, page_offset(rec));
+ page_rec_set_n_owned<true>(block, page_dir_slot_get_rec(slot), half_owned,
+ true, mtr);
+ page_rec_set_n_owned<true>(block,
+ page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE),
+ n_owned - half_owned, true, mtr);
}
/**
@@ -1006,16 +1028,15 @@ static void page_dir_balance_slot(const buf_block_t &block, ulint s)
}
/** Allocate space for inserting an index record.
-@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well
+@tparam compressed whether to update the ROW_FORMAT=COMPRESSED
@param[in,out] block index page
@param[in] need number of bytes needed
@param[out] heap_no record heap number
-@param[in,out] mtr mini-transaction
@return pointer to the start of the allocated buffer
@retval NULL if allocation fails */
template<bool compressed=false>
static byte* page_mem_alloc_heap(buf_block_t *block, ulint need,
- ulint *heap_no, mtr_t *mtr)
+ ulint *heap_no)
{
ut_ad(!compressed || block->page.zip.data);
@@ -1036,7 +1057,6 @@ static byte* page_mem_alloc_heap(buf_block_t *block, ulint need,
mach_write_to_2(heap_top, top + need);
mach_write_to_2(n_heap, h + 1);
- mtr->memcpy(*block, PAGE_HEAP_TOP + PAGE_HEADER, 4);
if (compressed)
{
@@ -1045,10 +1065,205 @@ static byte* page_mem_alloc_heap(buf_block_t *block, ulint need,
heap_top, 4);
}
- compile_time_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2);
return &block->frame[top];
}
+/** Write log for inserting a B-tree or R-tree record in
+ROW_FORMAT=REDUNDANT.
+@param block B-tree or R-tree page
+@param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
+@param prev_rec byte offset of the predecessor of the record to insert,
+ starting from PAGE_OLD_INFIMUM
+@param info_bits info_bits of the record
+@param n_fields_s number of fields << 1 | rec_get_1byte_offs_flag()
+@param hdr_c number of common record header bytes with prev_rec
+@param data_c number of common data bytes with prev_rec
+@param hdr record header bytes to copy to the log
+@param hdr_l number of copied record header bytes
+@param data record payload bytes to copy to the log
+@param data_l number of copied record data bytes */
+inline void mtr_t::page_insert(const buf_block_t &block, bool reuse,
+ ulint prev_rec, byte info_bits,
+ ulint n_fields_s, size_t hdr_c, size_t data_c,
+ const byte *hdr, size_t hdr_l,
+ const byte *data, size_t data_l)
+{
+ ut_ad(!block.page.zip.data);
+ ut_ad(m_log_mode == MTR_LOG_ALL);
+ ut_d(ulint n_slots= page_dir_get_n_slots(block.frame));
+ ut_ad(n_slots >= 2);
+ ut_d(const byte *page_end= page_dir_get_nth_slot(block.frame, n_slots - 1));
+ ut_ad(&block.frame[prev_rec + PAGE_OLD_INFIMUM] <= page_end);
+ ut_ad(block.frame + page_header_get_offs(block.frame, PAGE_HEAP_TOP) <=
+ page_end);
+ ut_ad(fil_page_index_page_check(block.frame));
+ ut_ad(!(~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG) & info_bits));
+ ut_ad(n_fields_s >= 2);
+ ut_ad((n_fields_s >> 1) <= REC_MAX_N_FIELDS);
+ ut_ad(data_l + data_c <= REDUNDANT_REC_MAX_DATA_SIZE);
+
+ set_modified();
+
+ static_assert(REC_INFO_MIN_REC_FLAG == 0x10, "compatibility");
+ static_assert(REC_INFO_DELETED_FLAG == 0x20, "compatibility");
+ n_fields_s= (n_fields_s - 2) << 2 | info_bits >> 4;
+
+ size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
+ static_assert((REC_MAX_N_FIELDS << 1 | 1) <= MIN_3BYTE, "compatibility");
+ len+= n_fields_s < MIN_2BYTE ? 1 : 2;
+ len+= hdr_c < MIN_2BYTE ? 1 : 2;
+ static_assert(REDUNDANT_REC_MAX_DATA_SIZE <= MIN_3BYTE, "compatibility");
+ len+= data_c < MIN_2BYTE ? 1 : 2;
+ len+= hdr_l + data_l;
+
+ const bool small= len < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5);
+ byte *l= log_write<EXTENDED>(block.page.id, &block.page, len, small);
+
+ if (UNIV_LIKELY(small))
+ {
+ ut_d(const byte * const end = l + len);
+ *l++= reuse ? INSERT_REUSE_REDUNDANT : INSERT_HEAP_REDUNDANT;
+ l= mlog_encode_varint(l, prev_rec);
+ l= mlog_encode_varint(l, n_fields_s);
+ l= mlog_encode_varint(l, hdr_c);
+ l= mlog_encode_varint(l, data_c);
+ ::memcpy(l, hdr, hdr_l);
+ l+= hdr_l;
+ ::memcpy(l, data, data_l);
+ l+= data_l;
+ ut_ad(end == l);
+ m_log.close(l);
+ }
+ else
+ {
+ m_log.close(l);
+ l= m_log.open(len - hdr_l - data_l);
+ ut_d(const byte * const end = l + len - hdr_l - data_l);
+ *l++= reuse ? INSERT_REUSE_REDUNDANT : INSERT_HEAP_REDUNDANT;
+ l= mlog_encode_varint(l, prev_rec);
+ l= mlog_encode_varint(l, n_fields_s);
+ l= mlog_encode_varint(l, hdr_c);
+ l= mlog_encode_varint(l, data_c);
+ ut_ad(end == l);
+ m_log.close(l);
+ m_log.push(hdr, static_cast<uint32_t>(hdr_l));
+ m_log.push(data, static_cast<uint32_t>(data_l));
+ }
+
+ m_last_offset= FIL_PAGE_TYPE;
+}
+
+/** Write log for inserting a B-tree or R-tree record in
+ROW_FORMAT=COMPACT or ROW_FORMAT=DYNAMIC.
+@param block B-tree or R-tree page
+@param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
+@param prev_rec byte offset of the predecessor of the record to insert,
+ starting from PAGE_NEW_INFIMUM
+@param info_status rec_get_info_and_status_bits()
+@param shift unless !reuse: number of bytes the PAGE_FREE is moving
+@param hdr_c number of common record header bytes with prev_rec
+@param data_c number of common data bytes with prev_rec
+@param hdr record header bytes to copy to the log
+@param hdr_l number of copied record header bytes
+@param data record payload bytes to copy to the log
+@param data_l number of copied record data bytes */
+inline void mtr_t::page_insert(const buf_block_t &block, bool reuse,
+ ulint prev_rec, byte info_status,
+ ssize_t shift, size_t hdr_c, size_t data_c,
+ const byte *hdr, size_t hdr_l,
+ const byte *data, size_t data_l)
+{
+ ut_ad(!block.page.zip.data);
+ ut_ad(m_log_mode == MTR_LOG_ALL);
+ ut_d(ulint n_slots= page_dir_get_n_slots(block.frame));
+ ut_ad(n_slots >= 2);
+ ut_d(const byte *page_end= page_dir_get_nth_slot(block.frame, n_slots - 1));
+ ut_ad(&block.frame[prev_rec + PAGE_NEW_INFIMUM] <= page_end);
+ ut_ad(block.frame + page_header_get_offs(block.frame, PAGE_HEAP_TOP) <=
+ page_end);
+ ut_ad(fil_page_index_page_check(block.frame));
+ ut_ad(hdr_l + hdr_c + data_l + data_c <=
+ static_cast<size_t>(page_end - &block.frame[PAGE_NEW_SUPREMUM_END]));
+ ut_ad(reuse || shift == 0);
+#ifdef UNIV_DEBUG
+ switch (~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG) & info_status) {
+ default:
+ ut_ad(0);
+ break;
+ case REC_STATUS_NODE_PTR:
+ ut_ad(!page_is_leaf(block.frame));
+ break;
+ case REC_STATUS_INSTANT:
+ case REC_STATUS_ORDINARY:
+ ut_ad(page_is_leaf(block.frame));
+ }
+#endif
+
+ set_modified();
+
+ static_assert(REC_INFO_MIN_REC_FLAG == 0x10, "compatibility");
+ static_assert(REC_INFO_DELETED_FLAG == 0x20, "compatibility");
+ static_assert(REC_STATUS_INSTANT == 4, "compatibility");
+
+ const size_t enc_hdr_l= hdr_l << 3 |
+ (info_status & REC_STATUS_INSTANT) | info_status >> 4;
+ size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
+ static_assert(REC_MAX_N_FIELDS * 2 < MIN_3BYTE, "compatibility");
+ if (reuse)
+ {
+ if (shift < 0)
+ shift= -shift << 1 | 1;
+ else
+ shift<<= 1;
+ len+= shift < MIN_2BYTE ? 1 : shift < MIN_3BYTE ? 2 : 3;
+ }
+ ut_ad(hdr_c + hdr_l <= REC_MAX_N_FIELDS * 2);
+ len+= hdr_c < MIN_2BYTE ? 1 : 2;
+ len+= enc_hdr_l < MIN_2BYTE ? 1 : enc_hdr_l < MIN_3BYTE ? 2 : 3;
+ len+= data_c < MIN_2BYTE ? 1 : data_c < MIN_3BYTE ? 2 : 3;
+ len+= hdr_l + data_l;
+
+ const bool small= len < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5);
+ byte *l= log_write<EXTENDED>(block.page.id, &block.page, len, small);
+
+ if (UNIV_LIKELY(small))
+ {
+ ut_d(const byte * const end = l + len);
+ *l++= reuse ? INSERT_REUSE_DYNAMIC : INSERT_HEAP_DYNAMIC;
+ l= mlog_encode_varint(l, prev_rec);
+ if (reuse)
+ l= mlog_encode_varint(l, shift);
+ l= mlog_encode_varint(l, enc_hdr_l);
+ l= mlog_encode_varint(l, hdr_c);
+ l= mlog_encode_varint(l, data_c);
+ ::memcpy(l, hdr, hdr_l);
+ l+= hdr_l;
+ ::memcpy(l, data, data_l);
+ l+= data_l;
+ ut_ad(end == l);
+ m_log.close(l);
+ }
+ else
+ {
+ m_log.close(l);
+ l= m_log.open(len - hdr_l - data_l);
+ ut_d(const byte * const end = l + len - hdr_l - data_l);
+ *l++= reuse ? INSERT_REUSE_DYNAMIC : INSERT_HEAP_DYNAMIC;
+ l= mlog_encode_varint(l, prev_rec);
+ if (reuse)
+ l= mlog_encode_varint(l, shift);
+ l= mlog_encode_varint(l, enc_hdr_l);
+ l= mlog_encode_varint(l, hdr_c);
+ l= mlog_encode_varint(l, data_c);
+ ut_ad(end == l);
+ m_log.close(l);
+ m_log.push(hdr, static_cast<uint32_t>(hdr_l));
+ m_log.push(data, static_cast<uint32_t>(data_l));
+ }
+
+ m_last_offset= FIL_PAGE_TYPE;
+}
+
/***********************************************************//**
Inserts a record next to page cursor on an uncompressed page.
Returns pointer to inserted record if succeed, i.e., enough
@@ -1063,16 +1278,17 @@ page_cur_insert_rec_low(
offset_t* offsets,/*!< in/out: rec_get_offsets(rec, index) */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
- buf_block_t* block = cur->block;
+ buf_block_t* block= cur->block;
ut_ad(rec_offs_validate(rec, index, offsets));
-
+ ut_ad(rec_offs_n_fields(offsets) > 0);
ut_ad(index->table->not_redundant() == !!page_is_comp(block->frame));
ut_ad(!!page_is_comp(block->frame) == !!rec_offs_comp(offsets));
ut_ad(fil_page_index_page_check(block->frame));
ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + block->frame) ==
index->id ||
mtr->is_inside_ibuf());
+ ut_ad(page_dir_get_n_slots(block->frame) >= 2);
ut_ad(!page_rec_is_supremum(cur->rec));
@@ -1098,11 +1314,15 @@ page_cur_insert_rec_low(
#endif /* UNIV_DEBUG_VALGRIND */
/* 2. Try to find suitable space from page memory management */
+ bool reuse= false;
+ ssize_t free_offset= 0;
ulint heap_no;
byte *insert_buf;
- alignas(2) byte hdr[8];
- if (rec_t* free_rec = page_header_get_ptr(block->frame, PAGE_FREE))
+ const bool comp= page_is_comp(block->frame);
+ const ulint extra_size= rec_offs_extra_size(offsets);
+
+ if (rec_t* free_rec= page_header_get_ptr(block->frame, PAGE_FREE))
{
/* Try to reuse the head of PAGE_FREE. */
offset_t foffsets_[REC_OFFS_NORMAL_SIZE];
@@ -1113,8 +1333,10 @@ page_cur_insert_rec_low(
offset_t *foffsets= rec_get_offsets(free_rec, index, foffsets_,
page_is_leaf(block->frame),
ULINT_UNDEFINED, &heap);
- insert_buf= free_rec - rec_offs_extra_size(foffsets);
- const bool too_small= rec_offs_size(foffsets) < rec_size;
+ const ulint fextra_size= rec_offs_extra_size(foffsets);
+ insert_buf= free_rec - fextra_size;
+ const bool too_small= (fextra_size + rec_offs_data_size(foffsets)) <
+ rec_size;
if (UNIV_LIKELY_NULL(heap))
mem_heap_free(heap);
@@ -1123,76 +1345,74 @@ page_cur_insert_rec_low(
byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
block->frame);
- if (page_is_comp(block->frame))
+ if (comp)
{
heap_no= rec_get_heap_no_new(free_rec);
- const rec_t *next= rec_get_next_ptr(free_rec, true);
- mach_write_to_2(hdr, next ? page_offset(next) : 0);
+ uint16_t next= mach_read_from_2(free_rec - REC_NEXT);
+ mach_write_to_2(page_free, next
+ ? static_cast<uint16_t>(free_rec + next - block->frame)
+ : 0);
}
else
{
heap_no= rec_get_heap_no_old(free_rec);
- memcpy(hdr, free_rec - REC_NEXT, 2);
+ memcpy(page_free, free_rec - REC_NEXT, 2);
}
static_assert(PAGE_GARBAGE == PAGE_FREE + 2, "compatibility");
- byte *page_garbage = my_assume_aligned<2>(page_free + 2);
+
+ byte *page_garbage= my_assume_aligned<2>(page_free + 2);
ut_ad(mach_read_from_2(page_garbage) >= rec_size);
- mach_write_to_2(my_assume_aligned<2>(hdr + 2),
- mach_read_from_2(page_garbage) - rec_size);
- mtr->memcpy(*block, page_free, hdr, 4);
+ mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) - rec_size);
+ reuse= true;
+ free_offset= extra_size - fextra_size;
}
else
{
use_heap:
- insert_buf= page_mem_alloc_heap(block, rec_size, &heap_no, mtr);
+ insert_buf= page_mem_alloc_heap(block, rec_size, &heap_no);
if (UNIV_UNLIKELY(!insert_buf))
return nullptr;
}
- const ulint extra_size= rec_offs_extra_size(offsets);
ut_ad(cur->rec != insert_buf + extra_size);
- const rec_t *next_rec= page_rec_get_next_low(cur->rec,
- page_is_comp(block->frame));
+ rec_t *next_rec= block->frame + rec_get_next_offs(cur->rec, comp);
+ ut_ad(next_rec != block->frame);
/* Update page header fields */
- rec_t *last_insert= page_header_get_ptr(block->frame, PAGE_LAST_INSERT);
- ut_ad(!last_insert || !page_is_comp(block->frame) ||
- rec_get_node_ptr_flag(last_insert) == rec_get_node_ptr_flag(rec));
-
- static_assert(PAGE_N_RECS - PAGE_LAST_INSERT + 2 == sizeof hdr,
- "compatibility");
+ byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
+ block->frame);
+ const uint16_t last_insert= mach_read_from_2(page_last_insert);
+ ut_ad(!last_insert || !comp ||
+ rec_get_node_ptr_flag(block->frame + last_insert) ==
+ rec_get_node_ptr_flag(rec));
/* Write PAGE_LAST_INSERT */
- mach_write_to_2(hdr, page_offset(insert_buf + extra_size));
- static_assert(PAGE_INSTANT - PAGE_LAST_INSERT == 2, "compatibility");
- static_assert(PAGE_DIRECTION_B - PAGE_INSTANT == 1, "compatibility");
- static_assert(PAGE_N_DIRECTION - PAGE_DIRECTION_B == 1, "compat.");
- static_assert(PAGE_N_RECS - PAGE_N_DIRECTION == 2, "compatibility");
+ mach_write_to_2(page_last_insert, page_offset(insert_buf + extra_size));
/* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
- memcpy_aligned<2>(hdr + 2, PAGE_HEADER + PAGE_INSTANT + block->frame,
- PAGE_N_RECS - PAGE_INSTANT + 2);
-
- if (!index->is_spatial())
+ if (block->frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
{
- byte *dir= &hdr[PAGE_DIRECTION_B - PAGE_LAST_INSERT];
- byte *n= my_assume_aligned<2>(&hdr[PAGE_N_DIRECTION - PAGE_LAST_INSERT]);
+ byte *dir= &block->frame[PAGE_DIRECTION_B + PAGE_HEADER];
+ byte *n= my_assume_aligned<2>
+ (&block->frame[PAGE_N_DIRECTION + PAGE_HEADER]);
if (UNIV_UNLIKELY(!last_insert))
{
no_direction:
*dir= (*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION;
memset(n, 0, 2);
}
- else if (last_insert == cur->rec && (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
+ else if (block->frame + last_insert == cur->rec &&
+ (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
{
*dir= (*dir & ~((1U << 3) - 1)) | PAGE_RIGHT;
inc_dir:
mach_write_to_2(n, mach_read_from_2(n) + 1);
}
- else if (next_rec == last_insert && (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
+ else if (next_rec == block->frame + last_insert &&
+ (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
{
*dir= (*dir & ~((1U << 3) - 1)) | PAGE_LEFT;
goto inc_dir;
@@ -1202,20 +1422,24 @@ inc_dir:
}
/* Update PAGE_N_RECS. */
- mach_write_to_2(hdr + PAGE_N_RECS - PAGE_LAST_INSERT,
- mach_read_from_2(hdr + PAGE_N_RECS - PAGE_LAST_INSERT) + 1);
- /* Write the header fields in one record. */
- mtr->memcpy(*block, PAGE_LAST_INSERT + PAGE_HEADER + block->frame,
- hdr, PAGE_N_RECS - PAGE_LAST_INSERT + 2);
+ byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
+ block->frame);
+
+ mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
/* Update the preceding record header, the 'owner' record and
prepare the record to insert. */
+ rec_t *insert_rec= insert_buf + extra_size;
+ const ulint data_size= rec_offs_data_size(offsets);
+ memcpy(insert_buf, rec - extra_size, extra_size + data_size);
+ size_t hdr_common= 0;
ulint n_owned;
- static_assert(sizeof hdr >= REC_N_NEW_EXTRA_BYTES, "compatibility");
- static_assert(sizeof hdr >= REC_N_OLD_EXTRA_BYTES, "compatibility");
- ulint fixed_hdr;
+ const byte info_status= static_cast<byte>
+ (rec_get_info_and_status_bits(rec, comp));
+ ut_ad(!(rec_get_info_bits(rec, comp) &
+ ~(REC_INFO_DELETED_FLAG | REC_INFO_MIN_REC_FLAG)));
- if (page_is_comp(block->frame))
+ if (comp)
{
#ifdef UNIV_DEBUG
switch (rec_get_status(cur->rec)) {
@@ -1228,9 +1452,19 @@ inc_dir:
ut_ad(!"wrong status on cur->rec");
}
switch (rec_get_status(rec)) {
- case REC_STATUS_ORDINARY:
case REC_STATUS_NODE_PTR:
+ ut_ad(!page_is_leaf(block->frame));
+ break;
case REC_STATUS_INSTANT:
+ ut_ad(index->is_instant());
+ ut_ad(page_is_leaf(block->frame));
+ if (!rec_is_metadata(rec, true))
+ break;
+ ut_ad(cur->rec == &block->frame[PAGE_NEW_INFIMUM]);
+ break;
+ case REC_STATUS_ORDINARY:
+ ut_ad(page_is_leaf(block->frame));
+ ut_ad(!(rec_get_info_bits(rec, true) & ~REC_INFO_DELETED_FLAG));
break;
case REC_STATUS_INFIMUM:
case REC_STATUS_SUPREMUM:
@@ -1238,155 +1472,129 @@ inc_dir:
}
ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
#endif
- memcpy(hdr, rec - REC_N_NEW_EXTRA_BYTES, REC_N_NEW_EXTRA_BYTES);
- rec_set_bit_field_1(hdr + REC_N_NEW_EXTRA_BYTES, 0, REC_NEW_N_OWNED,
+
+ rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
- rec_set_bit_field_2(hdr + REC_N_NEW_EXTRA_BYTES, heap_no,
+ insert_rec[-REC_NEW_STATUS]= rec[-REC_NEW_STATUS];
+ rec_set_bit_field_2(insert_rec, heap_no,
REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
- const rec_t *insert_rec= insert_buf + extra_size;
- mach_write_to_2(REC_N_NEW_EXTRA_BYTES - REC_NEXT + hdr,
+ mach_write_to_2(insert_rec - REC_NEXT,
static_cast<uint16_t>(next_rec - insert_rec));
- mtr->write<2>(*block, cur->rec - REC_NEXT,
- static_cast<uint16_t>(insert_rec - cur->rec));
- while (!(n_owned = rec_get_n_owned_new(next_rec)))
- next_rec= page_rec_get_next_low(next_rec, true);
- page_rec_set_n_owned<false>(block, const_cast<rec_t*>(next_rec),
- n_owned + 1, true, mtr);
- fixed_hdr= REC_N_NEW_EXTRA_BYTES;
+ mach_write_to_2(cur->rec - REC_NEXT,
+ static_cast<uint16_t>(insert_rec - cur->rec));
+ while (!(n_owned= rec_get_n_owned_new(next_rec)))
+ {
+ next_rec= block->frame + rec_get_next_offs(next_rec, true);
+ ut_ad(next_rec != block->frame);
+ }
+ rec_set_bit_field_1(next_rec, n_owned + 1, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ if (mtr->get_log_mode() != MTR_LOG_ALL)
+ goto copied;
+
+ const byte * const c_start= cur->rec - extra_size;
+ if (extra_size > REC_N_NEW_EXTRA_BYTES &&
+ c_start >=
+ &block->frame[PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES])
+ {
+ /* Find common header bytes with the preceding record. */
+ const byte *r= rec - (REC_N_NEW_EXTRA_BYTES + 1);
+ for (const byte *c= cur->rec - (REC_N_NEW_EXTRA_BYTES + 1);
+ *r == *c && c-- != c_start; r--);
+ hdr_common= static_cast<size_t>((rec - (REC_N_NEW_EXTRA_BYTES + 1)) - r);
+ ut_ad(hdr_common <= extra_size - REC_N_NEW_EXTRA_BYTES);
+ }
}
else
{
- memcpy(hdr, rec - REC_N_OLD_EXTRA_BYTES, REC_N_OLD_EXTRA_BYTES);
- rec_set_bit_field_1(hdr + REC_N_OLD_EXTRA_BYTES, 0, REC_OLD_N_OWNED,
+#ifdef UNIV_DEBUG
+ if (!page_is_leaf(block->frame));
+ else if (rec_is_metadata(rec, false))
+ {
+ ut_ad(index->is_instant());
+ ut_ad(cur->rec == &block->frame[PAGE_OLD_INFIMUM]);
+ }
+#endif
+ rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
- rec_set_bit_field_2(hdr + REC_N_OLD_EXTRA_BYTES, heap_no,
+ rec_set_bit_field_2(insert_rec, heap_no,
REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
- memcpy(hdr + REC_N_OLD_EXTRA_BYTES - REC_NEXT, cur->rec - REC_NEXT, 2);
- mtr->write<2>(*block, cur->rec - REC_NEXT,
- page_offset(insert_buf + extra_size));
- while (!(n_owned = rec_get_n_owned_old(next_rec)))
- next_rec= page_rec_get_next_low(next_rec, false);
- page_rec_set_n_owned<false>(block, const_cast<rec_t*>(next_rec),
- n_owned + 1, false, mtr);
- fixed_hdr= REC_N_OLD_EXTRA_BYTES;
+ memcpy(insert_rec - REC_NEXT, cur->rec - REC_NEXT, 2);
+ mach_write_to_2(cur->rec - REC_NEXT, page_offset(insert_rec));
+ while (!(n_owned= rec_get_n_owned_old(next_rec)))
+ {
+ next_rec= block->frame + rec_get_next_offs(next_rec, false);
+ ut_ad(next_rec != block->frame);
+ }
+ rec_set_bit_field_1(next_rec, n_owned + 1, REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ if (mtr->get_log_mode() != MTR_LOG_ALL)
+ goto copied;
+
+ ut_ad(extra_size > REC_N_OLD_EXTRA_BYTES);
+ const byte * const c_start= cur->rec - extra_size;
+ if (c_start >=
+ &block->frame[PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES])
+ {
+ /* Find common header bytes with the preceding record. */
+ const byte *r= rec - (REC_N_OLD_EXTRA_BYTES + 1);
+ for (const byte *c= cur->rec - (REC_N_OLD_EXTRA_BYTES + 1);
+ *r == *c && c-- != c_start; r--);
+ hdr_common= static_cast<size_t>((rec - (REC_N_OLD_EXTRA_BYTES + 1)) - r);
+ ut_ad(hdr_common <= extra_size - REC_N_OLD_EXTRA_BYTES);
+ }
}
- ut_ad(fixed_hdr <= extra_size);
/* Insert the record, possibly copying from the preceding record. */
- const ulint data_size = rec_offs_data_size(offsets);
- ut_ad(mtr->has_modifications());
+ ut_ad(mtr->get_log_mode() == MTR_LOG_ALL);
- if (mtr->get_log_mode() == MTR_LOG_ALL)
+ if (data_size)
{
- /* Temporarily write everything to rec, to simplify the code below. */
- byte rec_hdr[REC_N_OLD_EXTRA_BYTES];
- memcpy(rec_hdr, rec - fixed_hdr, fixed_hdr);
- memcpy(const_cast<rec_t*>(rec - fixed_hdr), hdr, fixed_hdr);
-
- byte *b= insert_buf;
- const byte *r= rec - extra_size;
-
- /* Skip any unchanged prefix of the record header. */
- for (;; b++, r++)
- if (UNIV_UNLIKELY(b == insert_buf + rec_size))
- goto rec_done;
- else if (*b != *r)
- break;
-
+ const byte *r= rec;
+ const byte *c= cur->rec;
+ const byte *c_end= cur->rec + data_size;
+ if (c <= insert_buf && c_end > insert_buf)
+ c_end= insert_buf;
+ else
+ c_end= std::min<const byte*>(c_end, block->frame + srv_page_size -
+ PAGE_DIR - PAGE_DIR_SLOT_SIZE *
+ page_dir_get_n_slots(block->frame));
+ size_t data_common= 0;
+ /* Copy common data bytes of the preceding record. */
+ if (c != c_end)
{
- const byte *c= cur->rec - (rec - r);
- const byte *c_end= std::min(cur->rec + data_size,
- block->frame + srv_page_size);
- if (c <= insert_buf && c_end > insert_buf)
- c_end= insert_buf;
-
- /* Try to copy any bytes of the preceding record. */
- if (UNIV_LIKELY(c >= block->frame && c < c_end))
- {
- const byte *cm= c;
- const byte *rm= r;
- while (*rm++ == *cm++)
- if (cm == c_end)
- break;
- rm--, cm--;
- ut_ad(rm - r + b <= insert_buf + rec_size);
- size_t len= static_cast<size_t>(rm - r);
- ut_ad(!memcmp(r, c, len));
- if (len > 2)
- {
- memcpy(b, c, len);
- mtr->memmove(*block, page_offset(b), page_offset(c), len);
- c= cm;
- b+= rm - r;
- r= rm;
- }
- }
-
- if (c < cur->rec)
- {
- if (!data_size)
- {
-no_data:
- mtr->memcpy<mtr_t::FORCED>(*block, b, r, cur->rec - c);
- goto rec_done;
- }
- /* Some header bytes differ. Compare the data separately. */
- byte *bd= insert_buf + extra_size;
- const byte *rd= rec;
- /* Skip any unchanged prefix of the record payload. */
- for (;; bd++, rd++)
- if (bd == insert_buf + rec_size)
- goto no_data;
- else if (*bd != *rd)
- break;
-
- /* Try to copy any data bytes of the preceding record. */
- const byte * const cd= cur->rec + (rd - rec);
- if (c_end - cd > 2)
- {
- const byte *cdm= cd;
- const byte *rdm= rd;
- while (*rdm++ == *cdm++)
- if (cdm == c_end)
- break;
- cdm--, rdm--;
- ut_ad(rdm - rd + bd <= insert_buf + rec_size);
- size_t len= static_cast<size_t>(rdm - rd);
- ut_ad(!memcmp(rd, cd, len));
- if (len > 2)
- {
- mtr->memcpy<mtr_t::FORCED>(*block, b, r, cur->rec - c);
- memcpy(bd, cd, len);
- mtr->memmove(*block, page_offset(bd), page_offset(cd), len);
- c= cdm;
- b= rdm - rd + bd;
- r= rdm;
- }
- }
- }
+ for (; *r == *c && c++ != c_end; r++);
+ data_common= static_cast<size_t>(r - rec);
}
- if (size_t len= static_cast<size_t>(insert_buf + rec_size - b))
- mtr->memcpy<mtr_t::FORCED>(*block, b, r, len);
-rec_done:
- ut_ad(!memcmp(insert_buf, rec - extra_size, rec_size));
-
- /* Restore the record header. */
- memcpy(const_cast<rec_t*>(rec - fixed_hdr), rec_hdr, fixed_hdr);
- }
- else
- {
- memcpy(insert_buf, rec - extra_size, extra_size - fixed_hdr);
- memcpy(insert_buf + extra_size - fixed_hdr, hdr, fixed_hdr);
- memcpy(insert_buf + extra_size, rec, data_size);
+ if (comp)
+ mtr->page_insert(*block, reuse,
+ cur->rec - block->frame - PAGE_NEW_INFIMUM,
+ info_status, free_offset, hdr_common, data_common,
+ insert_buf,
+ extra_size - hdr_common - REC_N_NEW_EXTRA_BYTES,
+ r, data_size - data_common);
+ else
+ mtr->page_insert(*block, reuse,
+ cur->rec - block->frame - PAGE_OLD_INFIMUM,
+ info_status, rec_get_n_fields_old(insert_rec) << 1 |
+ rec_get_1byte_offs_flag(insert_rec),
+ hdr_common, data_common,
+ insert_buf,
+ extra_size - hdr_common - REC_N_OLD_EXTRA_BYTES,
+ r, data_size - data_common);
}
+copied:
+ ut_ad(!memcmp(insert_buf, rec - extra_size, extra_size -
+ (comp ? REC_N_NEW_EXTRA_BYTES : REC_N_OLD_EXTRA_BYTES)));
+ ut_ad(!memcmp(insert_rec, rec, data_size));
/* We have incremented the n_owned field of the owner record.
If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED, we have to split the
corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
- page_dir_split_slot<false>(block, page_dir_find_owner_slot(next_rec), mtr);
+ page_dir_split_slot(*block, page_dir_find_owner_slot(next_rec));
rec_offs_make_valid(insert_buf + extra_size, index,
page_is_leaf(block->frame), offsets);
@@ -1704,12 +1912,13 @@ too_small:
{
use_heap:
ut_ad(!free_rec);
- insert_buf = page_mem_alloc_heap<true>(cursor->block, rec_size, &heap_no,
- mtr);
+ insert_buf= page_mem_alloc_heap<true>(cursor->block, rec_size, &heap_no);
if (UNIV_UNLIKELY(!insert_buf))
return insert_buf;
+ static_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2, "compatibility");
+ mtr->memcpy(*cursor->block, PAGE_HEAP_TOP + PAGE_HEADER, 4);
page_zip_dir_add_slot(cursor->block, index, mtr);
}
@@ -1807,8 +2016,8 @@ inc_dir:
record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
- page_dir_split_slot<true>(cursor->block,
- page_dir_find_owner_slot(next_rec), mtr);
+ page_zip_dir_split_slot(cursor->block,
+ page_dir_find_owner_slot(next_rec), mtr);
page_zip_write_rec(cursor->block, insert_rec, index, offsets, 1, mtr);
return insert_rec;
@@ -2057,6 +2266,467 @@ page_cur_delete_rec(
: page_simple_validate_old(block->frame));
}
+/** Apply a INSERT_HEAP_REDUNDANT or INSERT_REUSE_REDUNDANT record that was
+written by page_cur_insert_rec_low() for a ROW_FORMAT=REDUNDANT page.
+@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
+@param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
+@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
+@param enc_hdr encoded fixed-size header bits
+@param hdr_c number of common record header bytes with prev
+@param data_c number of common data bytes with prev
+@param data literal header and data bytes
+@param data_len length of the literal data, in bytes
+@return whether the operation failed (inconcistency was noticed) */
+bool page_apply_insert_redundant(const buf_block_t &block, bool reuse,
+ ulint prev, ulint enc_hdr,
+ size_t hdr_c, size_t data_c,
+ const void *data, size_t data_len)
+{
+ const uint16_t n_slots= page_dir_get_n_slots(block.frame);
+ byte *page_n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER +
+ block.frame);
+ const uint16_t h= mach_read_from_2(page_n_heap);
+ if (UNIV_UNLIKELY(n_slots < 2 || h < n_slots || h < PAGE_HEAP_NO_USER_LOW ||
+ h >= srv_page_size / REC_N_OLD_EXTRA_BYTES ||
+ !fil_page_index_page_check(block.frame) ||
+ page_get_page_no(block.frame) != block.page.id.page_no() ||
+ mach_read_from_2(my_assume_aligned<2>
+ (PAGE_OLD_SUPREMUM - REC_NEXT +
+ block.frame))))
+ {
+corrupted:
+ ib::error() << (reuse
+ ? "Not applying INSERT_REUSE_REDUNDANT"
+ " due to corruption on "
+ : "Not applying INSERT_HEAP_REDUNDANT"
+ " due to corruption on ")
+ << block.page.id;
+ return true;
+ }
+
+ const byte *const slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
+ byte * const page_heap_top= my_assume_aligned<2>
+ (PAGE_HEAP_TOP + PAGE_HEADER + block.frame);
+ const byte *const heap_bot= &block.frame[PAGE_OLD_SUPREMUM_END];
+ byte *heap_top= block.frame + mach_read_from_2(page_heap_top);
+ if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > slot))
+ goto corrupted;
+ if (UNIV_UNLIKELY(mach_read_from_2(slot) != PAGE_OLD_SUPREMUM))
+ goto corrupted;
+ if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) !=
+ PAGE_OLD_INFIMUM))
+ goto corrupted;
+ rec_t * const prev_rec= block.frame + PAGE_OLD_INFIMUM + prev;
+ if (!prev);
+ else if (UNIV_UNLIKELY(heap_bot + (REC_N_OLD_EXTRA_BYTES + 1) > prev_rec ||
+ prev_rec > heap_top))
+ goto corrupted;
+ const ulint pn_fields= rec_get_bit_field_2(prev_rec, REC_OLD_N_FIELDS,
+ REC_OLD_N_FIELDS_MASK,
+ REC_OLD_N_FIELDS_SHIFT);
+ if (UNIV_UNLIKELY(pn_fields == 0 || pn_fields > REC_MAX_N_FIELDS))
+ goto corrupted;
+ const ulint pextra_size= REC_N_OLD_EXTRA_BYTES +
+ (rec_get_1byte_offs_flag(prev_rec) ? pn_fields : pn_fields * 2);
+ if (prev_rec == &block.frame[PAGE_OLD_INFIMUM]);
+ else if (UNIV_UNLIKELY(prev_rec - pextra_size < heap_bot))
+ goto corrupted;
+ const ulint pdata_size= rec_get_data_size_old(prev_rec);
+ if (UNIV_UNLIKELY(prev_rec + pdata_size > heap_top))
+ goto corrupted;
+ rec_t * const next_rec= block.frame + mach_read_from_2(prev_rec - REC_NEXT);
+ if (next_rec == block.frame + PAGE_OLD_SUPREMUM);
+ else if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > next_rec ||
+ next_rec > heap_top))
+ goto corrupted;
+ const bool is_short= (enc_hdr >> 2) & 1;
+ const ulint n_fields= (enc_hdr >> 3) + 1;
+ if (UNIV_UNLIKELY(n_fields > REC_MAX_N_FIELDS))
+ goto corrupted;
+ const ulint extra_size= REC_N_OLD_EXTRA_BYTES +
+ (is_short ? n_fields : n_fields * 2);
+ hdr_c+= REC_N_OLD_EXTRA_BYTES;
+ if (UNIV_UNLIKELY(hdr_c > extra_size || hdr_c > pextra_size))
+ goto corrupted;
+ if (UNIV_UNLIKELY(extra_size - hdr_c > data_len))
+ goto corrupted;
+ /* We buffer all changes to the record header locally, so that
+ we will avoid modifying the page before all consistency checks
+ have been fulfilled. */
+ alignas(2) byte insert_buf[REC_N_OLD_EXTRA_BYTES + REC_MAX_N_FIELDS * 2];
+
+ ulint n_owned;
+ rec_t *owner_rec= next_rec;
+ for (ulint ns= PAGE_DIR_SLOT_MAX_N_OWNED;
+ !(n_owned= rec_get_n_owned_old(owner_rec)); )
+ {
+ owner_rec= block.frame + mach_read_from_2(owner_rec - REC_NEXT);
+ if (owner_rec == &block.frame[PAGE_OLD_SUPREMUM]);
+ else if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > owner_rec ||
+ owner_rec > heap_top))
+ goto corrupted;
+ if (!ns--)
+ goto corrupted; /* Corrupted (cyclic?) next-record list */
+ }
+
+ if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED)
+ goto corrupted;
+
+ mach_write_to_2(insert_buf, owner_rec - block.frame);
+ ulint owner_slot= n_slots;
+ static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
+ while (memcmp_aligned<2>(slot + 2 * owner_slot, insert_buf, 2))
+ if (!owner_slot--)
+ goto corrupted;
+ owner_slot= n_slots - 1 - owner_slot;
+
+ memcpy(insert_buf, data, extra_size - hdr_c);
+ byte *insert_rec= &insert_buf[extra_size];
+ memcpy(insert_rec - hdr_c, prev_rec - hdr_c, hdr_c);
+ rec_set_bit_field_1(insert_rec, (enc_hdr & 3) << 4,
+ REC_OLD_INFO_BITS, REC_INFO_BITS_MASK,
+ REC_INFO_BITS_SHIFT);
+ rec_set_1byte_offs_flag(insert_rec, is_short);
+ rec_set_n_fields_old(insert_rec, n_fields);
+ rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+
+ const ulint data_size= rec_get_data_size_old(insert_rec);
+ if (UNIV_UNLIKELY(data_c > data_size))
+ goto corrupted;
+ if (UNIV_UNLIKELY(extra_size - hdr_c + data_size - data_c != data_len))
+ goto corrupted;
+
+ /* Perform final consistency checks and then apply the change to the page. */
+ byte *buf;
+ if (reuse)
+ {
+ byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
+ block.frame);
+ rec_t *free_rec= block.frame + mach_read_from_2(page_free);
+ if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > free_rec ||
+ free_rec > heap_top))
+ goto corrupted;
+ const ulint fn_fields= rec_get_n_fields_old(free_rec);
+ const ulint fextra_size= REC_N_OLD_EXTRA_BYTES +
+ (rec_get_1byte_offs_flag(free_rec) ? fn_fields : fn_fields * 2);
+ if (UNIV_UNLIKELY(free_rec - fextra_size < heap_bot))
+ goto corrupted;
+ const ulint fdata_size= rec_get_data_size_old(free_rec);
+ if (UNIV_UNLIKELY(free_rec + data_size > heap_top))
+ goto corrupted;
+ if (UNIV_UNLIKELY(extra_size + data_size > fextra_size + fdata_size))
+ goto corrupted;
+ byte *page_garbage= my_assume_aligned<2>(page_free + 2);
+ if (UNIV_UNLIKELY(mach_read_from_2(page_garbage) <
+ fextra_size + fdata_size))
+ goto corrupted;
+ buf= free_rec - fextra_size;
+ const rec_t *const next_free= block.frame +
+ mach_read_from_2(free_rec - REC_NEXT);
+ if (next_free == block.frame);
+ else if (UNIV_UNLIKELY(next_free < &heap_bot[REC_N_OLD_EXTRA_BYTES + 1] ||
+ heap_top < next_free))
+ goto corrupted;
+ mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) -
+ extra_size - data_size);
+ rec_set_bit_field_2(insert_rec, rec_get_heap_no_old(free_rec),
+ REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
+ memcpy(page_free, free_rec - REC_NEXT, 2);
+ }
+ else
+ {
+ if (UNIV_UNLIKELY(heap_top + extra_size + data_size > slot))
+ goto corrupted;
+ rec_set_bit_field_2(insert_rec, h,
+ REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
+ mach_write_to_2(page_n_heap, h + 1);
+ mach_write_to_2(page_heap_top,
+ mach_read_from_2(page_heap_top) + extra_size + data_size);
+ buf= heap_top;
+ }
+
+ ut_ad(data_size - data_c == data_len - (extra_size - hdr_c));
+ byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
+ block.frame);
+ const uint16_t last_insert= mach_read_from_2(page_last_insert);
+ memcpy(buf, insert_buf, extra_size);
+ buf+= extra_size;
+ mach_write_to_2(page_last_insert, buf - block.frame);
+ memcpy(prev_rec - REC_NEXT, page_last_insert, 2);
+ memcpy(buf, prev_rec, data_c);
+ memcpy(buf + data_c, static_cast<const byte*>(data) + (extra_size - hdr_c),
+ data_len - (extra_size - hdr_c));
+ rec_set_bit_field_1(owner_rec, n_owned + 1, REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+
+ /* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
+ if (block.frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
+ {
+ byte *dir= &block.frame[PAGE_DIRECTION_B + PAGE_HEADER];
+ byte *n_dir= my_assume_aligned<2>
+ (&block.frame[PAGE_N_DIRECTION + PAGE_HEADER]);
+ if (UNIV_UNLIKELY(!last_insert))
+ {
+no_direction:
+ *dir= (*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION;
+ memset(n_dir, 0, 2);
+ }
+ else if (block.frame + last_insert == prev_rec &&
+ (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
+ {
+ *dir= (*dir & ~((1U << 3) - 1)) | PAGE_RIGHT;
+inc_dir:
+ mach_write_to_2(n_dir, mach_read_from_2(n_dir) + 1);
+ }
+ else if (next_rec == block.frame + last_insert &&
+ (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
+ {
+ *dir= (*dir & ~((1U << 3) - 1)) | PAGE_LEFT;
+ goto inc_dir;
+ }
+ else
+ goto no_direction;
+ }
+
+ /* Update PAGE_N_RECS. */
+ byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
+ block.frame);
+
+ mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
+
+ if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
+ page_dir_split_slot(block, owner_slot);
+ ut_ad(page_simple_validate_old(block.frame));
+ return false;
+}
+
+/** Apply a INSERT_HEAP_DYNAMIC or INSERT_REUSE_DYNAMIC record that was
+written by page_cur_insert_rec_low() for a ROW_FORMAT=COMPACT or DYNAMIC page.
+@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
+@param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
+@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
+@param shift unless !reuse: number of bytes the PAGE_FREE is moving
+@param enc_hdr_l number of copied record header bytes, plus record type bits
+@param hdr_c number of common record header bytes with prev
+@param data_c number of common data bytes with prev
+@param data literal header and data bytes
+@param data_len length of the literal data, in bytes
+@return whether the operation failed (inconcistency was noticed) */
+bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
+ ulint prev, ulint shift, ulint enc_hdr_l,
+ size_t hdr_c, size_t data_c,
+ const void *data, size_t data_len)
+{
+ const uint16_t n_slots= page_dir_get_n_slots(block.frame);
+ byte *page_n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER +
+ block.frame);
+ ulint h= mach_read_from_2(page_n_heap);
+ if (UNIV_UNLIKELY(n_slots < 2 || h < (PAGE_HEAP_NO_USER_LOW | 0x8000) ||
+ (h & 0x7fff) >= srv_page_size / REC_N_NEW_EXTRA_BYTES ||
+ (h & 0x7fff) < n_slots ||
+ !fil_page_index_page_check(block.frame) ||
+ page_get_page_no(block.frame) != block.page.id.page_no() ||
+ mach_read_from_2(my_assume_aligned<2>
+ (PAGE_NEW_SUPREMUM - REC_NEXT +
+ block.frame)) ||
+ ((enc_hdr_l & REC_STATUS_INSTANT) &&
+ !page_is_leaf(block.frame)) ||
+ (enc_hdr_l >> 3) > data_len))
+ {
+corrupted:
+ ib::error() << (reuse
+ ? "Not applying INSERT_REUSE_DYNAMIC"
+ " due to corruption on "
+ : "Not applying INSERT_HEAP_DYNAMIC"
+ " due to corruption on ")
+ << block.page.id;
+ return true;
+ }
+
+ const byte *const slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
+ byte * const page_heap_top= my_assume_aligned<2>
+ (PAGE_HEAP_TOP + PAGE_HEADER + block.frame);
+ const byte *const heap_bot= &block.frame[PAGE_NEW_SUPREMUM_END];
+ byte *heap_top= block.frame + mach_read_from_2(page_heap_top);
+ if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > slot))
+ goto corrupted;
+ if (UNIV_UNLIKELY(mach_read_from_2(slot) != PAGE_NEW_SUPREMUM))
+ goto corrupted;
+ if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) !=
+ PAGE_NEW_INFIMUM))
+ goto corrupted;
+
+ uint16_t n= static_cast<uint16_t>(PAGE_NEW_INFIMUM + prev);
+ rec_t *prev_rec= block.frame + n;
+ n+= mach_read_from_2(prev_rec - REC_NEXT);
+ if (!prev);
+ else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > prev_rec ||
+ prev_rec > heap_top))
+ goto corrupted;
+
+ rec_t * const next_rec= block.frame + n;
+ if (next_rec == block.frame + PAGE_NEW_SUPREMUM);
+ else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > next_rec ||
+ next_rec > heap_top))
+ goto corrupted;
+
+ ulint n_owned;
+ rec_t *owner_rec= next_rec;
+ n= static_cast<uint16_t>(next_rec - block.frame);
+
+ for (ulint ns= PAGE_DIR_SLOT_MAX_N_OWNED;
+ !(n_owned= rec_get_n_owned_new(owner_rec)); )
+ {
+ n+= mach_read_from_2(owner_rec - REC_NEXT);
+ owner_rec= block.frame + n;
+ if (n == PAGE_NEW_SUPREMUM);
+ else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > owner_rec ||
+ owner_rec > heap_top))
+ goto corrupted;
+ if (!ns--)
+ goto corrupted; /* Corrupted (cyclic?) next-record list */
+ }
+
+ ulint owner_slot= n_slots;
+
+ if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED)
+ goto corrupted;
+ else
+ {
+ static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
+ alignas(2) byte slot_buf[2];
+ mach_write_to_2(slot_buf, owner_rec - block.frame);
+ while (memcmp_aligned<2>(slot + 2 * owner_slot, slot_buf, 2))
+ if (!owner_slot--)
+ goto corrupted;
+ owner_slot= n_slots - 1 - owner_slot;
+ }
+
+ const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_c + (enc_hdr_l >> 3);
+ const ulint data_size= data_c + data_len - (enc_hdr_l >> 3);
+
+ /* Perform final consistency checks and then apply the change to the page. */
+ byte *buf;
+ if (reuse)
+ {
+ byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
+ block.frame);
+ rec_t *free_rec= block.frame + mach_read_from_2(page_free);
+ if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > free_rec ||
+ free_rec > heap_top))
+ goto corrupted;
+ buf= free_rec - extra_size;
+ if (shift & 1)
+ buf-= shift >> 1;
+ else
+ buf+= shift >> 1;
+
+ if (UNIV_UNLIKELY(heap_bot > buf ||
+ &buf[extra_size + data_size] > heap_top))
+ goto corrupted;
+ byte *page_garbage= my_assume_aligned<2>(page_free + 2);
+ if (UNIV_UNLIKELY(mach_read_from_2(page_garbage) < extra_size + data_size))
+ goto corrupted;
+ if ((n= mach_read_from_2(free_rec - REC_NEXT)) != 0)
+ {
+ n+= static_cast<uint16_t>(free_rec - block.frame);
+ if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
+ heap_top < block.frame + n))
+ goto corrupted;
+ }
+ mach_write_to_2(page_free, n);
+ mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) -
+ (extra_size + data_size));
+ h= rec_get_heap_no_new(free_rec);
+ }
+ else
+ {
+ if (UNIV_UNLIKELY(heap_top + extra_size + data_size > slot))
+ goto corrupted;
+ mach_write_to_2(page_n_heap, h + 1);
+ h&= 0x7fff;
+ mach_write_to_2(page_heap_top,
+ mach_read_from_2(page_heap_top) + extra_size + data_size);
+ buf= heap_top;
+ }
+
+ memcpy(buf, data, (enc_hdr_l >> 3));
+ buf+= enc_hdr_l >> 3;
+ data_len-= enc_hdr_l >> 3;
+ data= &static_cast<const byte*>(data)[enc_hdr_l >> 3];
+
+ memcpy(buf, &prev_rec[-REC_N_NEW_EXTRA_BYTES - hdr_c], hdr_c);
+ buf+= hdr_c;
+ *buf++= (enc_hdr_l & 3) << 4; /* info_bits; n_owned=0 */
+ *buf++= static_cast<byte>(h >> 5); /* MSB of heap number */
+ h= (h & (1U << 5) - 1) << 3;
+ static_assert(REC_STATUS_ORDINARY == 0, "compatibility");
+ static_assert(REC_STATUS_INSTANT == 4, "compatibility");
+ if (page_is_leaf(block.frame))
+ h|= enc_hdr_l & REC_STATUS_INSTANT;
+ else
+ {
+ ut_ad(!(enc_hdr_l & REC_STATUS_INSTANT)); /* Checked at the start */
+ h|= REC_STATUS_NODE_PTR;
+ }
+ *buf++= static_cast<byte>(h); /* LSB of heap number, and status */
+ static_assert(REC_NEXT == 2, "compatibility");
+ buf+= REC_NEXT;
+ mach_write_to_2(buf - REC_NEXT, static_cast<uint16_t>(next_rec - buf));
+ byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
+ block.frame);
+ const uint16_t last_insert= mach_read_from_2(page_last_insert);
+ mach_write_to_2(page_last_insert, buf - block.frame);
+ mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>(buf - prev_rec));
+ memcpy(buf, prev_rec, data_c);
+ buf+= data_c;
+ memcpy(buf, data, data_len);
+
+ rec_set_bit_field_1(owner_rec, n_owned + 1, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+
+ /* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
+ if (block.frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
+ {
+ byte *dir= &block.frame[PAGE_DIRECTION_B + PAGE_HEADER];
+ byte *n_dir= my_assume_aligned<2>
+ (&block.frame[PAGE_N_DIRECTION + PAGE_HEADER]);
+ if (UNIV_UNLIKELY(!last_insert))
+ {
+no_direction:
+ *dir= (*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION;
+ memset(n_dir, 0, 2);
+ }
+ else if (block.frame + last_insert == prev_rec &&
+ (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
+ {
+ *dir= (*dir & ~((1U << 3) - 1)) | PAGE_RIGHT;
+inc_dir:
+ mach_write_to_2(n_dir, mach_read_from_2(n_dir) + 1);
+ }
+ else if (next_rec == block.frame + last_insert &&
+ (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
+ {
+ *dir= (*dir & ~((1U << 3) - 1)) | PAGE_LEFT;
+ goto inc_dir;
+ }
+ else
+ goto no_direction;
+ }
+
+ /* Update PAGE_N_RECS. */
+ byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
+ block.frame);
+
+ mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
+
+ if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
+ page_dir_split_slot(block, owner_slot);
+ ut_ad(page_simple_validate_new(block.frame));
+ return false;
+}
+
/** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by
page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page.
@param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT