diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2020-02-22 17:32:45 +0200 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2020-02-22 21:19:47 +0200 |
commit | 572d20757be38157fa2309c35efdec19e68087f1 (patch) | |
tree | f0ad2fdada34110218dc8b99ea56c5eec532c544 | |
parent | bc76cfe8f868fde245f8aa274fba349d352bbfa7 (diff) | |
download | mariadb-git-572d20757be38157fa2309c35efdec19e68087f1.tar.gz |
MDEV-12353: Reduce log volume of page_cur_delete_rec()
mrec_ext_t: Introduce DELETE_ROW_FORMAT_REDUNDANT,
DELETE_ROW_FORMAT_DYNAMIC.
mtr_t::page_delete(): Write DELETE_ROW_FORMAT_REDUNDANT or
DELETE_ROW_FORMAT_DYNAMIC log records. We log the byte offset
of the preceding record, so that on recovery we can easily
find everything to update. For DELETE_ROW_FORMAT_DYNAMIC,
we must also write the header and data size of the record.
We will retain the physical logging for ROW_FORMAT=COMPRESSED pages.
page_zip_dir_balance_slot(): Renamed from page_dir_balance_slot(),
and specialized for ROW_FORMAT=COMPRESSED only.
page_rec_set_n_owned(), page_dir_slot_set_n_owned(),
page_dir_balance_slot(): New variants that do not write any log.
page_mem_free(): Take data_size, extra_size as parameters.
Always zerofill the record payload.
page_cur_delete_rec(): For other than ROW_FORMAT=COMPRESSED,
only write log by mtr_t::page_delete().
-rw-r--r-- | storage/innobase/include/mtr0log.h | 54 | ||||
-rw-r--r-- | storage/innobase/include/mtr0mtr.h | 16 | ||||
-rw-r--r-- | storage/innobase/include/mtr0types.h | 13 | ||||
-rw-r--r-- | storage/innobase/include/page0cur.h | 15 | ||||
-rw-r--r-- | storage/innobase/include/page0page.h | 4 | ||||
-rw-r--r-- | storage/innobase/log/log0recv.cc | 38 | ||||
-rw-r--r-- | storage/innobase/page/page0cur.cc | 538 |
7 files changed, 504 insertions, 174 deletions
diff --git a/storage/innobase/include/mtr0log.h b/storage/innobase/include/mtr0log.h index a1e06ca8425..fe80068fa0e 100644 --- a/storage/innobase/include/mtr0log.h +++ b/storage/innobase/include/mtr0log.h @@ -536,7 +536,7 @@ inline void mtr_t::log_write_extended(const buf_block_t &block, byte type) } /** Write log for partly initializing a B-tree or R-tree page. -@param block B-tree page +@param block B-tree or R-tree page @param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */ inline void mtr_t::page_create(const buf_block_t &block, bool comp) { @@ -545,6 +545,58 @@ inline void mtr_t::page_create(const buf_block_t &block, bool comp) log_write_extended(block, comp); } +/** Write log for deleting a B-tree or R-tree record in ROW_FORMAT=REDUNDANT. +@param block B-tree or R-tree page +@param prev_rec byte offset of the predecessor of the record to delete, + starting from PAGE_OLD_INFIMUM */ +inline void mtr_t::page_delete(const buf_block_t &block, ulint prev_rec) +{ + ut_ad(!block.zip_size()); + ut_ad(prev_rec < block.physical_size()); + set_modified(); + if (m_log_mode != MTR_LOG_ALL) + return; + size_t len= (prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4); + byte *l= log_write<EXTENDED>(block.page.id, &block.page, len, true); + ut_d(byte *end= l + len); + *l++= DELETE_ROW_FORMAT_REDUNDANT; + l= mlog_encode_varint(l, prev_rec); + ut_ad(end == l); + m_log.close(l); + m_last_offset= FIL_PAGE_TYPE; +} + +/** Write log for deleting a COMPACT or DYNAMIC B-tree or R-tree record. +@param block B-tree or R-tree page +@param prev_rec byte offset of the predecessor of the record to delete, + starting from PAGE_NEW_INFIMUM +@param prev_rec the predecessor of the record to delete +@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES +@param data_size data payload size, in bytes */ +inline void mtr_t::page_delete(const buf_block_t &block, ulint prev_rec, + size_t hdr_size, size_t data_size) +{ + ut_ad(!block.zip_size()); + set_modified(); + ut_ad(hdr_size < MIN_3BYTE); + ut_ad(prev_rec < block.physical_size()); + ut_ad(data_size < block.physical_size()); + if (m_log_mode != MTR_LOG_ALL) + return; + size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4; + len+= hdr_size < MIN_2BYTE ? 1 : 2; + len+= data_size < MIN_2BYTE ? 1 : data_size < MIN_3BYTE ? 2 : 3; + byte *l= log_write<EXTENDED>(block.page.id, &block.page, len, true); + ut_d(byte *end= l + len); + *l++= DELETE_ROW_FORMAT_DYNAMIC; + l= mlog_encode_varint(l, prev_rec); + l= mlog_encode_varint(l, hdr_size); + l= mlog_encode_varint(l, data_size); + ut_ad(end == l); + m_log.close(l); + m_last_offset= FIL_PAGE_TYPE; +} + /** Write log for initializing an undo log page. @param block undo page */ inline void mtr_t::undo_create(const buf_block_t &block) diff --git a/storage/innobase/include/mtr0mtr.h b/storage/innobase/include/mtr0mtr.h index 93f8d79b4f9..9461765abf4 100644 --- a/storage/innobase/include/mtr0mtr.h +++ b/storage/innobase/include/mtr0mtr.h @@ -491,9 +491,23 @@ struct mtr_t { @param id page identifier */ inline void free(const page_id_t id); /** Write log for partly initializing a B-tree or R-tree page. - @param block B-tree page + @param block B-tree or R-tree page @param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */ inline void page_create(const buf_block_t &block, bool comp); + /** Write log for deleting a B-tree or R-tree record in ROW_FORMAT=REDUNDANT. + @param block B-tree or R-tree page + @param prev_rec byte offset of the predecessor of the record to delete, + starting from PAGE_OLD_INFIMUM */ + inline void page_delete(const buf_block_t &block, ulint prev_rec); + /** Write log for deleting a COMPACT or DYNAMIC B-tree or R-tree record. + @param block B-tree or R-tree page + @param prev_rec byte offset of the predecessor of the record to delete, + starting from PAGE_NEW_INFIMUM + @param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES + @param data_size data payload size, in bytes */ + inline void page_delete(const buf_block_t &block, ulint prev_rec, + size_t hdr_size, size_t data_size); + /** Write log for initializing an undo log page. @param block undo page */ inline void undo_create(const buf_block_t &block); diff --git a/storage/innobase/include/mtr0types.h b/storage/innobase/include/mtr0types.h index 1062e96f9d9..bfa30cf30b1 100644 --- a/storage/innobase/include/mtr0types.h +++ b/storage/innobase/include/mtr0types.h @@ -262,7 +262,18 @@ enum mrec_ext_t /** Append a record to an undo log page. This is equivalent to the old MLOG_UNDO_INSERT record. The current byte offset will be reset to FIL_PAGE_TYPE. */ - UNDO_APPEND= 3 + UNDO_APPEND= 3, + /** Delete a record on a ROW_FORMAT=REDUNDANT page. + We point to the precedessor of the record to be deleted. + The current byte offset will be reset to FIL_PAGE_TYPE. + This is similar to the old MLOG_REC_DELETE record. */ + DELETE_ROW_FORMAT_REDUNDANT= 8, + /** Delete a record on a ROW_FORMAT=COMPACT or DYNAMIC page. + We point to the precedessor of the record to be deleted + and include the total size of the record being deleted. + The current byte offset will be reset to FIL_PAGE_TYPE. + This is similar to the old MLOG_COMP_REC_DELETE record. */ + DELETE_ROW_FORMAT_DYNAMIC= 9 }; diff --git a/storage/innobase/include/page0cur.h b/storage/innobase/include/page0cur.h index 36a401cf0db..8387a409cde 100644 --- a/storage/innobase/include/page0cur.h +++ b/storage/innobase/include/page0cur.h @@ -201,6 +201,21 @@ page_cur_delete_rec( mtr_t* mtr) /*!< in/out: mini-transaction */ MY_ATTRIBUTE((nonnull)); +/** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by +page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page. +@param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT +@param prev byte offset of the predecessor, relative to PAGE_OLD_INFIMUM */ +void page_apply_delete_redundant(const buf_block_t &block, ulint prev); + +/** Apply a DELETE_ROW_FORMAT_DYNAMIC record that was written by +page_cur_delete_rec() for a ROW_FORMAT=COMPACT or DYNAMIC page. +@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC +@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM +@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES +@param data_size data payload size, in bytes */ +void page_apply_delete_dynamic(const buf_block_t &block, ulint prev, + size_t hdr_size, size_t data_size); + /** Search the right position for a page cursor. @param[in] block buffer block @param[in] index index tree diff --git a/storage/innobase/include/page0page.h b/storage/innobase/include/page0page.h index c3b80e3e196..ddd5d83892e 100644 --- a/storage/innobase/include/page0page.h +++ b/storage/innobase/include/page0page.h @@ -410,7 +410,7 @@ inline trx_id_t page_get_max_trx_id(const page_t *page) Set the number of owned records. @tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well @param[in,out] block index page -@param[in,out] rec ROW_FORMAT=REDUNDANT record +@param[in,out] rec record in block.frame @param[in] n_owned number of records skipped in the sparse page directory @param[in] comp whether ROW_FORMAT is one of COMPACT,DYNAMIC,COMPRESSED @param[in,out] mtr mini-transaction */ @@ -643,7 +643,7 @@ page_rec_check( @return pointer to record */ inline rec_t *page_dir_slot_get_rec(page_dir_slot_t *slot) { - return page_align(slot) + mach_read_from_2(slot); + return page_align(slot) + mach_read_from_2(my_assume_aligned<2>(slot)); } inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot) { diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index 0912b169ec0..176b8c1d5d1 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -45,6 +45,7 @@ Created 9/20/1997 Heikki Tuuri #include "mtr0mtr.h" #include "mtr0log.h" #include "page0page.h" +#include "page0cur.h" #include "trx0undo.h" #include "ibuf0ibuf.h" #include "trx0undo.h" @@ -282,14 +283,14 @@ public: goto next; case EXTENDED: if (UNIV_UNLIKELY(block.page.id.page_no() < 3 || - block.page.zip.ssize) && - !srv_force_recovery) + block.page.zip.ssize)) goto record_corrupted; static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity"); static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility"); if (UNIV_UNLIKELY(!rlen)) goto record_corrupted; switch (*l) { + uint8_t ll; default: goto record_corrupted; case INIT_ROW_FORMAT_REDUNDANT: @@ -308,6 +309,39 @@ public: goto record_corrupted; undo_append(block, ++l, --rlen); break; + case DELETE_ROW_FORMAT_REDUNDANT: + if (UNIV_UNLIKELY(rlen < 2 || rlen > 4)) + goto record_corrupted; + rlen--; + ll= mlog_decode_varint_length(*++l); + if (UNIV_UNLIKELY(ll != rlen)) + goto record_corrupted; + page_apply_delete_redundant(block, mlog_decode_varint(l)); + break; + case DELETE_ROW_FORMAT_DYNAMIC: + if (UNIV_UNLIKELY(rlen < 2)) + goto record_corrupted; + rlen--; + ll= mlog_decode_varint_length(*++l); + if (UNIV_UNLIKELY(ll > 3 || ll >= rlen)) + goto record_corrupted; + size_t prev_rec= mlog_decode_varint(l); + ut_ad(prev_rec != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(ll > 2 || ll >= rlen)) + goto record_corrupted; + size_t hdr_size= mlog_decode_varint(l); + ut_ad(hdr_size != MLOG_DECODE_ERROR); + rlen-= ll; + l+= ll; + ll= mlog_decode_varint_length(*l); + if (UNIV_UNLIKELY(ll > 3 || ll != rlen)) + goto record_corrupted; + page_apply_delete_dynamic(block, prev_rec, hdr_size, + mlog_decode_varint(l)); + break; } last_offset= FIL_PAGE_TYPE; goto next_after_applying; diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc index d58c63a6c86..45cf5886302 100644 --- a/storage/innobase/page/page0cur.cc +++ b/storage/innobase/page/page0cur.cc @@ -785,17 +785,15 @@ page_cur_open_on_rnd_user_rec( } /** -Set the owned records field of the record pointed to by a directory slot. -@param[in,out] block file page -@param[in] slot sparse directory slot -@param[in,out] n number of records owned by the directory slot -@param[in,out] mtr mini-transaction */ -static void page_dir_slot_set_n_owned(buf_block_t *block, - const page_dir_slot_t *slot, - ulint n, mtr_t *mtr) +Set the number of owned records. +@param[in,out] rec record in block.frame +@param[in] n_owned number of records skipped in the sparse page directory +@param[in] comp whether ROW_FORMAT is COMPACT or DYNAMIC */ +static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp) { - rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot)); - page_rec_set_n_owned<true>(block, rec, n, page_rec_is_comp(rec), mtr); + rec-= comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED; + *rec= static_cast<byte>((*rec & ~REC_N_OWNED_MASK) | + (n_owned << REC_N_OWNED_SHIFT)); } /** @@ -874,12 +872,13 @@ static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr) Try to balance an underfilled directory slot with an adjacent one, so that there are at least the minimum number of records owned by the slot; this may result in merging the two slots. -@param[in,out] block index page +@param[in,out] block ROW_FORMAT=COMPRESSED page @param[in] s the slot to be balanced @param[in,out] mtr mini-transaction */ -static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr) +static void page_zip_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr) { - ut_ad(!block->page.zip.data || page_is_comp(block->frame)); + ut_ad(block->page.zip.data); + ut_ad(page_is_comp(block->frame)); ut_ad(s > 0); const ulint n_slots = page_dir_get_n_slots(block->frame); @@ -892,21 +891,23 @@ static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr) ut_ad(s < n_slots); page_dir_slot_t* slot = page_dir_get_nth_slot(block->frame, s); - page_dir_slot_t* up_slot = slot - PAGE_DIR_SLOT_SIZE; - const ulint up_n_owned = page_dir_slot_get_n_owned(up_slot); + rec_t* const up_rec = const_cast<rec_t*> + (page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE)); + rec_t* const slot_rec = const_cast<rec_t*> + (page_dir_slot_get_rec(slot)); + const ulint up_n_owned = rec_get_n_owned_new(up_rec); - ut_ad(page_dir_slot_get_n_owned(slot) + ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(slot)) == PAGE_DIR_SLOT_MIN_N_OWNED - 1); if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) { compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1 <= PAGE_DIR_SLOT_MAX_N_OWNED); /* Merge the slots. */ - ulint n_owned = page_dir_slot_get_n_owned(slot); - page_dir_slot_set_n_owned(block, slot, 0, mtr); - page_dir_slot_set_n_owned(block, up_slot, n_owned - + page_dir_slot_get_n_owned(up_slot), - mtr); + page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr); + page_rec_set_n_owned<true>(block, up_rec, up_n_owned + + (PAGE_DIR_SLOT_MIN_N_OWNED - 1), + true, mtr); /* Shift the slots */ page_dir_slot_t* last_slot = page_dir_get_nth_slot( block->frame, n_slots - 1); @@ -916,48 +917,92 @@ static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr) byte *n_slots_p= my_assume_aligned<2> (n_slots_f + block->frame); mtr->write<2>(*block, n_slots_p, n_slots - 1); + memcpy_aligned<2>(n_slots_f + block->page.zip.data, + n_slots_p, 2); + memset_aligned<2>(last_slot, 0, 2); + return; + } - if (UNIV_LIKELY_NULL(block->page.zip.data)) { - memset_aligned<2>(last_slot, 0, 2); - memcpy_aligned<2>(n_slots_f + block->page.zip.data, - n_slots_p, 2); - } else { - mtr->memmove(*block, PAGE_DIR_SLOT_SIZE - + page_offset(last_slot), - page_offset(last_slot), slot - last_slot); - mtr->write<2>(*block, last_slot, 0U); - } + /* Transfer one record to the underfilled slot */ + page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr); + rec_t* new_rec = rec_get_next_ptr(slot_rec, TRUE); + page_rec_set_n_owned<true>(block, new_rec, + PAGE_DIR_SLOT_MIN_N_OWNED, + true, mtr); + mach_write_to_2(slot, page_offset(new_rec)); + page_rec_set_n_owned(up_rec, up_n_owned - 1, true); +} + +/** +Try to balance an underfilled directory slot with an adjacent one, +so that there are at least the minimum number of records owned by the slot; +this may result in merging the two slots. +@param[in,out] block index page +@param[in] s the slot to be balanced */ +static void page_dir_balance_slot(const buf_block_t &block, ulint s) +{ + const bool comp= page_is_comp(block.frame); + ut_ad(!block.page.zip.data); + ut_ad(s > 0); + + const ulint n_slots = page_dir_get_n_slots(block.frame); + + if (UNIV_UNLIKELY(s + 1 == n_slots)) { + /* The last directory slot cannot be balanced. */ + return; + } + + ut_ad(s < n_slots); + + page_dir_slot_t* slot = page_dir_get_nth_slot(block.frame, s); + rec_t* const up_rec = const_cast<rec_t*> + (page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE)); + rec_t* const slot_rec = const_cast<rec_t*> + (page_dir_slot_get_rec(slot)); + const ulint up_n_owned = comp + ? rec_get_n_owned_new(up_rec) + : rec_get_n_owned_old(up_rec); + + ut_ad(page_dir_slot_get_n_owned(slot) + == PAGE_DIR_SLOT_MIN_N_OWNED - 1); + if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) { + compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1 + <= PAGE_DIR_SLOT_MAX_N_OWNED); + /* Merge the slots. */ + page_rec_set_n_owned(slot_rec, 0, comp); + page_rec_set_n_owned(up_rec, up_n_owned + + (PAGE_DIR_SLOT_MIN_N_OWNED - 1), comp); + /* Shift the slots */ + page_dir_slot_t* last_slot = page_dir_get_nth_slot( + block.frame, n_slots - 1); + memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot, + slot - last_slot); + memset_aligned<2>(last_slot, 0, 2); + constexpr uint16_t n_slots_f = PAGE_N_DIR_SLOTS + PAGE_HEADER; + byte *n_slots_p= my_assume_aligned<2> + (n_slots_f + block.frame); + mach_write_to_2(n_slots_p, n_slots - 1); return; } /* Transfer one record to the underfilled slot */ - rec_t* old_rec = const_cast<rec_t*>(page_dir_slot_get_rec(slot)); rec_t* new_rec; - if (page_is_comp(block->frame)) { - new_rec = rec_get_next_ptr(old_rec, TRUE); - - page_rec_set_n_owned<true>(block, old_rec, 0, true, mtr); - page_rec_set_n_owned<true>(block, new_rec, - PAGE_DIR_SLOT_MIN_N_OWNED, - true, mtr); - if (UNIV_LIKELY_NULL(block->page.zip.data)) { - mach_write_to_2(slot, page_offset(new_rec)); - goto func_exit; - } + if (comp) { + page_rec_set_n_owned(slot_rec, 0, true); + new_rec = rec_get_next_ptr(slot_rec, TRUE); + page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED, true); + page_rec_set_n_owned(up_rec, up_n_owned - 1, true); } else { - new_rec = rec_get_next_ptr(old_rec, FALSE); - - page_rec_set_n_owned<false>(block, old_rec, 0, false, mtr); - page_rec_set_n_owned<false>(block, new_rec, - PAGE_DIR_SLOT_MIN_N_OWNED, - false, mtr); + page_rec_set_n_owned(slot_rec, 0, false); + new_rec = rec_get_next_ptr(slot_rec, FALSE); + page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED, + false); + page_rec_set_n_owned(up_rec, up_n_owned - 1, false); } - mtr->write<2>(*block, slot, page_offset(new_rec)); -func_exit: - page_dir_slot_set_n_owned(block, up_slot, up_n_owned - 1, mtr); + mach_write_to_2(slot, page_offset(new_rec)); } /** Allocate space for inserting an index record. @@ -1766,111 +1811,77 @@ inc_dir: return insert_rec; } -/** Prepend a record to the PAGE_FREE list. -@param[in,out] block index page -@param[in,out] rec record being deleted -@param[in] index the index that the page belongs to -@param[in] offsets rec_get_offsets(rec, index) -@param[in,out] mtr mini-transaction */ -static void page_mem_free(buf_block_t *block, rec_t *rec, - const dict_index_t *index, const offset_t *offsets, - mtr_t *mtr) +/** Prepend a record to the PAGE_FREE list, or shrink PAGE_HEAP_TOP. +@param[in,out] block index page +@param[in,out] rec record being deleted +@param[in] data_size record payload size, in bytes +@param[in] extra_size record header size, in bytes */ +static void page_mem_free(const buf_block_t &block, rec_t *rec, + size_t data_size, size_t extra_size) { - ut_ad(rec_offs_validate(rec, index, offsets)); - ut_ad(page_align(rec) == block->frame); - const rec_t *free= page_header_get_ptr(block->frame, PAGE_FREE); - - if (UNIV_LIKELY_NULL(block->page.zip.data)) - { - page_header_reset_last_insert(block, mtr); - page_zip_dir_delete(block, rec, index, offsets, free, mtr); - return; - } + ut_ad(page_align(rec) == block.frame); + ut_ad(!block.page.zip.data); + const rec_t *free= page_header_get_ptr(block.frame, PAGE_FREE); - const uint16_t n_heap= page_header_get_field(block->frame, PAGE_N_HEAP) - 1; - ut_ad(page_get_n_recs(block->frame) < (n_heap & 0x7fff)); - alignas(4) byte page_header[6]; - const bool deleting_last= n_heap == ((n_heap & 0x8000) - ? (rec_get_heap_no_new(rec) | 0x8000) - : rec_get_heap_no_old(rec)); + const uint16_t n_heap= page_header_get_field(block.frame, PAGE_N_HEAP) - 1; + ut_ad(page_get_n_recs(block.frame) < (n_heap & 0x7fff)); + const bool deleting_top= n_heap == ((n_heap & 0x8000) + ? (rec_get_heap_no_new(rec) | 0x8000) + : rec_get_heap_no_old(rec)); - if (deleting_last) + if (deleting_top) { - const uint16_t heap_top= page_header_get_offs(block->frame, PAGE_HEAP_TOP); - const size_t extra_savings= heap_top - - page_offset(rec_get_end(rec, offsets)); + byte *page_heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER + + block.frame); + const uint16_t heap_top= mach_read_from_2(page_heap_top); + const size_t extra_savings= heap_top - page_offset(rec + data_size); ut_ad(extra_savings < heap_top); /* When deleting the last record, do not add it to the PAGE_FREE list. Instead, decrement PAGE_HEAP_TOP and PAGE_N_HEAP. */ - mach_write_to_2(page_header, page_offset(rec_get_start(rec, offsets))); - mach_write_to_2(my_assume_aligned<2>(page_header + 2), n_heap); + mach_write_to_2(page_heap_top, page_offset(rec - extra_size)); + mach_write_to_2(my_assume_aligned<2>(page_heap_top + 2), n_heap); static_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2, "compatibility"); - mtr->memcpy(*block, my_assume_aligned<4>(PAGE_HEAP_TOP + PAGE_HEADER + - block->frame), page_header, 4); if (extra_savings) { - uint16_t garbage= page_header_get_field(block->frame, PAGE_GARBAGE); - mach_write_to_2(page_header, garbage - extra_savings); - size_t len= 2; - if (page_header_get_field(block->frame, PAGE_LAST_INSERT)) - { - memset_aligned<2>(page_header + 2, 0, 2); - len= 4; - } - mtr->memcpy(*block, my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER + - block->frame), - page_header, len); + byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER + + block.frame); + uint16_t garbage= mach_read_from_2(page_garbage); + ut_ad(garbage >= extra_savings); + mach_write_to_2(page_garbage, garbage - extra_savings); } - else - mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2> - (PAGE_LAST_INSERT + PAGE_HEADER + block->frame), - 0U); } else { - mach_write_to_2(page_header, page_offset(rec)); - mach_write_to_2(my_assume_aligned<2>(page_header + 2), - rec_offs_size(offsets) + - page_header_get_field(block->frame, PAGE_GARBAGE)); - static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility"); - static_assert(PAGE_FREE + 4 == PAGE_LAST_INSERT, "compatibility"); - size_t size; - if (page_header_get_field(block->frame, PAGE_LAST_INSERT)) - { - memset_aligned<2>(page_header + 4, 0, 2); - size= 6; - } - else - size= 4; - mtr->memcpy(*block, my_assume_aligned<4>(PAGE_FREE + PAGE_HEADER + - block->frame), page_header, size); + byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER + + block.frame); + byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER + + block.frame); + mach_write_to_2(page_free, page_offset(rec)); + mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) + + extra_size + data_size); } - mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame, - ulint(page_get_n_recs(block->frame)) - 1); + memset_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER + block.frame, 0, 2); + byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER + + block.frame); + mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) - 1); + + const byte* const end= rec + data_size; - if (!deleting_last) + if (!deleting_top) { uint16_t next= free ? ((n_heap & 0x8000) ? static_cast<uint16_t>(free - rec) - : static_cast<uint16_t>(page_offset(free))) + : static_cast<uint16_t>(free - block.frame)) : 0; - mtr->write<2>(*block, rec - REC_NEXT, next); + mach_write_to_2(rec - REC_NEXT, next); } + else + rec-= extra_size; - if (srv_immediate_scrub_data_uncompressed) - { - size_t size= rec_offs_data_size(offsets); - if (deleting_last) - { - const size_t extra_size= rec_offs_extra_size(offsets); - rec-= extra_size; - size+= extra_size; - } - mtr->memset(block, page_offset(rec), size, 0); - } + memset(rec, 0, end - rec); } /***********************************************************//** @@ -1886,7 +1897,6 @@ page_cur_delete_rec( mtr_t* mtr) /*!< in/out: mini-transaction */ { page_dir_slot_t* cur_dir_slot; - page_dir_slot_t* prev_slot; rec_t* current_rec; rec_t* prev_rec = NULL; rec_t* next_rec; @@ -1946,10 +1956,8 @@ page_cur_delete_rec( /* Find the next and the previous record. Note that the cursor is left at the next record. */ - ut_ad(cur_slot_no > 0); - prev_slot = page_dir_get_nth_slot(block->frame, cur_slot_no - 1); - - rec = const_cast<rec_t*>(page_dir_slot_get_rec(prev_slot)); + rec = const_cast<rec_t*> + (page_dir_slot_get_rec(cur_dir_slot + PAGE_DIR_SLOT_SIZE)); /* rec now points to the record of the previous directory slot. Look for the immediate predecessor of current_rec in a loop. */ @@ -1989,47 +1997,243 @@ page_cur_delete_rec( mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t> (next_rec - prev_rec)); - mach_write_to_1(slot_rec - REC_NEW_N_OWNED, - (slot_rec[-REC_NEW_N_OWNED] - & ~REC_N_OWNED_MASK) - | (cur_n_owned - 1) << REC_N_OWNED_SHIFT); - } else { - if (current_rec == slot_rec) { - slot_rec = prev_rec; - mtr->write<2>(*block, cur_dir_slot, - page_offset(slot_rec)); + slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>( + (slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK) + | (cur_n_owned - 1) << REC_N_OWNED_SHIFT); + + page_header_reset_last_insert(block, mtr); + page_zip_dir_delete(block, rec, index, offsets, + page_header_get_ptr(block->frame, + PAGE_FREE), + mtr); + if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) { + page_zip_dir_balance_slot(block, cur_slot_no, mtr); } + return; + } - if (page_is_comp(block->frame)) { - mtr->write<2>(*block, prev_rec - REC_NEXT, - static_cast<uint16_t> - (next_rec - prev_rec)); - mtr->write<1>(*block, slot_rec - REC_NEW_N_OWNED, - (slot_rec[-REC_NEW_N_OWNED] - & ~REC_N_OWNED_MASK) - | (cur_n_owned - 1) - << REC_N_OWNED_SHIFT); - } else { - mtr->write<2>(*block, prev_rec - REC_NEXT, - page_offset(next_rec)); - mtr->write<1>(*block, slot_rec - REC_OLD_N_OWNED, - (slot_rec[-REC_OLD_N_OWNED] - & ~REC_N_OWNED_MASK) - | (cur_n_owned - 1) - << REC_N_OWNED_SHIFT); - } + if (current_rec == slot_rec) { + slot_rec = prev_rec; + mach_write_to_2(cur_dir_slot, page_offset(slot_rec)); + } + + const size_t data_size = rec_offs_data_size(offsets); + const size_t extra_size = rec_offs_extra_size(offsets); + + if (page_is_comp(block->frame)) { + mtr->page_delete(*block, page_offset(prev_rec) + - PAGE_NEW_INFIMUM, + extra_size - REC_N_NEW_EXTRA_BYTES, + data_size); + mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t> + (next_rec - prev_rec)); + slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>( + (slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK) + | (cur_n_owned - 1) << REC_N_OWNED_SHIFT); + } else { + mtr->page_delete(*block, page_offset(prev_rec) + - PAGE_OLD_INFIMUM); + memcpy(prev_rec - REC_NEXT, current_rec - REC_NEXT, 2); + slot_rec[-REC_OLD_N_OWNED] = static_cast<byte>( + (slot_rec[-REC_OLD_N_OWNED] & ~REC_N_OWNED_MASK) + | (cur_n_owned - 1) << REC_N_OWNED_SHIFT); } - /* Free the memory occupied by the record */ - page_mem_free(block, current_rec, index, offsets, mtr); + page_mem_free(*block, current_rec, data_size, extra_size); /* Now we have decremented the number of owned records of the slot. If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the slots. */ if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) { - page_dir_balance_slot(block, cur_slot_no, mtr); + page_dir_balance_slot(*block, cur_slot_no); } + + ut_ad(page_is_comp(block->frame) + ? page_simple_validate_new(block->frame) + : page_simple_validate_old(block->frame)); +} + +/** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by +page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page. +@param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT +@param prev byte offset of the predecessor, relative to PAGE_OLD_INFIMUM */ +void page_apply_delete_redundant(const buf_block_t &block, ulint prev) +{ + const uint16_t n_slots= page_dir_get_n_slots(block.frame); + ulint n_recs= page_get_n_recs(block.frame); + + if (UNIV_UNLIKELY(!n_recs || n_slots < 2 || + !fil_page_index_page_check(block.frame) || + page_get_page_no(block.frame) != block.page.id.page_no() || + mach_read_from_2(my_assume_aligned<2> + (PAGE_OLD_SUPREMUM - REC_NEXT + + block.frame)) || + page_is_comp(block.frame))) + { +corrupted: + ib::error() << "Not applying DELETE_ROW_FORMAT_REDUNDANT" + " due to corruption on " << block.page.id; + return; + } + + byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1); + rec_t *prev_rec= block.frame + PAGE_OLD_INFIMUM + prev; + if (UNIV_UNLIKELY(prev_rec > slot)) + goto corrupted; + uint16_t n= mach_read_from_2(prev_rec - REC_NEXT); + rec_t *rec= block.frame + n; + if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES || + slot < rec)) + goto corrupted; + const ulint extra_size= REC_N_OLD_EXTRA_BYTES + rec_get_n_fields_old(rec) * + (rec_get_1byte_offs_flag(rec) ? 1 : 2); + const ulint data_size= rec_get_data_size_old(rec); + if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + extra_size || + slot < rec + data_size)) + goto corrupted; + + n= mach_read_from_2(rec - REC_NEXT); + rec_t *next= block.frame + n; + if (n == PAGE_OLD_SUPREMUM); + else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES || + slot < next)) + goto corrupted; + + rec_t *s= rec; + ulint slot_owned; + for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_old(s)); ) + { + n= mach_read_from_2(s - REC_NEXT); + s= block.frame + n; + if (n == PAGE_OLD_SUPREMUM); + else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES || + slot < s)) + goto corrupted; + if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */ + goto corrupted; + } + slot_owned--; + + /* The first slot is always pointing to the infimum record. + Find the directory slot pointing to s. */ + const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2); + alignas(2) byte slot_offs[2]; + mach_write_to_2(slot_offs, s - block.frame); + static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility"); + + while (memcmp_aligned<2>(slot, slot_offs, 2)) + if ((slot+= 2) == first_slot) + goto corrupted; + + if (rec == s) + { + s= prev_rec; + mach_write_to_2(slot, s - block.frame); + } + + memcpy(prev_rec - REC_NEXT, rec - REC_NEXT, 2); + s-= REC_OLD_N_OWNED; + *s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) | + slot_owned << REC_N_OWNED_SHIFT); + page_mem_free(block, rec, data_size, extra_size); + + if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED) + page_dir_balance_slot(block, (first_slot - slot) / 2); + + ut_ad(page_simple_validate_old(block.frame)); +} + +/** Apply a DELETE_ROW_FORMAT_DYNAMIC record that was written by +page_cur_delete_rec() for a ROW_FORMAT=COMPACT or DYNAMIC page. +@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC +@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM +@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES +@param data_size data payload size, in bytes */ +void page_apply_delete_dynamic(const buf_block_t &block, ulint prev, + size_t hdr_size, size_t data_size) +{ + const uint16_t n_slots= page_dir_get_n_slots(block.frame); + ulint n_recs= page_get_n_recs(block.frame); + + if (UNIV_UNLIKELY(!n_recs || n_slots < 2 || + !fil_page_index_page_check(block.frame) || + page_get_page_no(block.frame) != block.page.id.page_no() || + mach_read_from_2(my_assume_aligned<2> + (PAGE_NEW_SUPREMUM - REC_NEXT + + block.frame)) || + !page_is_comp(block.frame))) + { +corrupted: + ib::error() << "Not applying DELETE_ROW_FORMAT_DYNAMIC" + " due to corruption on " << block.page.id; + return; + } + + byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1); + uint16_t n= static_cast<uint16_t>(PAGE_NEW_INFIMUM + prev); + rec_t *prev_rec= block.frame + n; + if (UNIV_UNLIKELY(prev_rec > slot)) + goto corrupted; + n+= mach_read_from_2(prev_rec - REC_NEXT); + rec_t *rec= block.frame + n; + if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES || + slot < rec)) + goto corrupted; + const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_size; + if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + extra_size || + slot < rec + data_size)) + goto corrupted; + n+= mach_read_from_2(rec - REC_NEXT); + rec_t *next= block.frame + n; + if (n == PAGE_NEW_SUPREMUM); + else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES || + slot < next)) + goto corrupted; + + rec_t *s= rec; + n= static_cast<uint16_t>(rec - block.frame); + ulint slot_owned; + for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_new(s)); ) + { + n+= mach_read_from_2(s - REC_NEXT); + s= block.frame + n; + if (n == PAGE_NEW_SUPREMUM); + else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES || + slot < s)) + goto corrupted; + if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */ + goto corrupted; + } + slot_owned--; + + /* The first slot is always pointing to the infimum record. + Find the directory slot pointing to s. */ + const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2); + alignas(2) byte slot_offs[2]; + mach_write_to_2(slot_offs, s - block.frame); + static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility"); + + while (memcmp_aligned<2>(slot, slot_offs, 2)) + if ((slot+= 2) == first_slot) + goto corrupted; + + if (rec == s) + { + s= prev_rec; + mach_write_to_2(slot, s - block.frame); + } + + mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>(next - prev_rec)); + s-= REC_NEW_N_OWNED; + *s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) | + slot_owned << REC_N_OWNED_SHIFT); + page_mem_free(block, rec, data_size, extra_size); + + if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED) + page_dir_balance_slot(block, (first_slot - slot) / 2); + + ut_ad(page_simple_validate_new(block.frame)); } #ifdef UNIV_COMPILE_TEST_FUNCS |