summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2020-02-22 17:32:45 +0200
committerMarko Mäkelä <marko.makela@mariadb.com>2020-02-22 21:19:47 +0200
commit572d20757be38157fa2309c35efdec19e68087f1 (patch)
treef0ad2fdada34110218dc8b99ea56c5eec532c544
parentbc76cfe8f868fde245f8aa274fba349d352bbfa7 (diff)
downloadmariadb-git-572d20757be38157fa2309c35efdec19e68087f1.tar.gz
MDEV-12353: Reduce log volume of page_cur_delete_rec()
mrec_ext_t: Introduce DELETE_ROW_FORMAT_REDUNDANT, DELETE_ROW_FORMAT_DYNAMIC. mtr_t::page_delete(): Write DELETE_ROW_FORMAT_REDUNDANT or DELETE_ROW_FORMAT_DYNAMIC log records. We log the byte offset of the preceding record, so that on recovery we can easily find everything to update. For DELETE_ROW_FORMAT_DYNAMIC, we must also write the header and data size of the record. We will retain the physical logging for ROW_FORMAT=COMPRESSED pages. page_zip_dir_balance_slot(): Renamed from page_dir_balance_slot(), and specialized for ROW_FORMAT=COMPRESSED only. page_rec_set_n_owned(), page_dir_slot_set_n_owned(), page_dir_balance_slot(): New variants that do not write any log. page_mem_free(): Take data_size, extra_size as parameters. Always zerofill the record payload. page_cur_delete_rec(): For other than ROW_FORMAT=COMPRESSED, only write log by mtr_t::page_delete().
-rw-r--r--storage/innobase/include/mtr0log.h54
-rw-r--r--storage/innobase/include/mtr0mtr.h16
-rw-r--r--storage/innobase/include/mtr0types.h13
-rw-r--r--storage/innobase/include/page0cur.h15
-rw-r--r--storage/innobase/include/page0page.h4
-rw-r--r--storage/innobase/log/log0recv.cc38
-rw-r--r--storage/innobase/page/page0cur.cc538
7 files changed, 504 insertions, 174 deletions
diff --git a/storage/innobase/include/mtr0log.h b/storage/innobase/include/mtr0log.h
index a1e06ca8425..fe80068fa0e 100644
--- a/storage/innobase/include/mtr0log.h
+++ b/storage/innobase/include/mtr0log.h
@@ -536,7 +536,7 @@ inline void mtr_t::log_write_extended(const buf_block_t &block, byte type)
}
/** Write log for partly initializing a B-tree or R-tree page.
-@param block B-tree page
+@param block B-tree or R-tree page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */
inline void mtr_t::page_create(const buf_block_t &block, bool comp)
{
@@ -545,6 +545,58 @@ inline void mtr_t::page_create(const buf_block_t &block, bool comp)
log_write_extended(block, comp);
}
+/** Write log for deleting a B-tree or R-tree record in ROW_FORMAT=REDUNDANT.
+@param block B-tree or R-tree page
+@param prev_rec byte offset of the predecessor of the record to delete,
+ starting from PAGE_OLD_INFIMUM */
+inline void mtr_t::page_delete(const buf_block_t &block, ulint prev_rec)
+{
+ ut_ad(!block.zip_size());
+ ut_ad(prev_rec < block.physical_size());
+ set_modified();
+ if (m_log_mode != MTR_LOG_ALL)
+ return;
+ size_t len= (prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4);
+ byte *l= log_write<EXTENDED>(block.page.id, &block.page, len, true);
+ ut_d(byte *end= l + len);
+ *l++= DELETE_ROW_FORMAT_REDUNDANT;
+ l= mlog_encode_varint(l, prev_rec);
+ ut_ad(end == l);
+ m_log.close(l);
+ m_last_offset= FIL_PAGE_TYPE;
+}
+
+/** Write log for deleting a COMPACT or DYNAMIC B-tree or R-tree record.
+@param block B-tree or R-tree page
+@param prev_rec byte offset of the predecessor of the record to delete,
+ starting from PAGE_NEW_INFIMUM
+@param prev_rec the predecessor of the record to delete
+@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
+@param data_size data payload size, in bytes */
+inline void mtr_t::page_delete(const buf_block_t &block, ulint prev_rec,
+ size_t hdr_size, size_t data_size)
+{
+ ut_ad(!block.zip_size());
+ set_modified();
+ ut_ad(hdr_size < MIN_3BYTE);
+ ut_ad(prev_rec < block.physical_size());
+ ut_ad(data_size < block.physical_size());
+ if (m_log_mode != MTR_LOG_ALL)
+ return;
+ size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
+ len+= hdr_size < MIN_2BYTE ? 1 : 2;
+ len+= data_size < MIN_2BYTE ? 1 : data_size < MIN_3BYTE ? 2 : 3;
+ byte *l= log_write<EXTENDED>(block.page.id, &block.page, len, true);
+ ut_d(byte *end= l + len);
+ *l++= DELETE_ROW_FORMAT_DYNAMIC;
+ l= mlog_encode_varint(l, prev_rec);
+ l= mlog_encode_varint(l, hdr_size);
+ l= mlog_encode_varint(l, data_size);
+ ut_ad(end == l);
+ m_log.close(l);
+ m_last_offset= FIL_PAGE_TYPE;
+}
+
/** Write log for initializing an undo log page.
@param block undo page */
inline void mtr_t::undo_create(const buf_block_t &block)
diff --git a/storage/innobase/include/mtr0mtr.h b/storage/innobase/include/mtr0mtr.h
index 93f8d79b4f9..9461765abf4 100644
--- a/storage/innobase/include/mtr0mtr.h
+++ b/storage/innobase/include/mtr0mtr.h
@@ -491,9 +491,23 @@ struct mtr_t {
@param id page identifier */
inline void free(const page_id_t id);
/** Write log for partly initializing a B-tree or R-tree page.
- @param block B-tree page
+ @param block B-tree or R-tree page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */
inline void page_create(const buf_block_t &block, bool comp);
+ /** Write log for deleting a B-tree or R-tree record in ROW_FORMAT=REDUNDANT.
+ @param block B-tree or R-tree page
+ @param prev_rec byte offset of the predecessor of the record to delete,
+ starting from PAGE_OLD_INFIMUM */
+ inline void page_delete(const buf_block_t &block, ulint prev_rec);
+ /** Write log for deleting a COMPACT or DYNAMIC B-tree or R-tree record.
+ @param block B-tree or R-tree page
+ @param prev_rec byte offset of the predecessor of the record to delete,
+ starting from PAGE_NEW_INFIMUM
+ @param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
+ @param data_size data payload size, in bytes */
+ inline void page_delete(const buf_block_t &block, ulint prev_rec,
+ size_t hdr_size, size_t data_size);
+
/** Write log for initializing an undo log page.
@param block undo page */
inline void undo_create(const buf_block_t &block);
diff --git a/storage/innobase/include/mtr0types.h b/storage/innobase/include/mtr0types.h
index 1062e96f9d9..bfa30cf30b1 100644
--- a/storage/innobase/include/mtr0types.h
+++ b/storage/innobase/include/mtr0types.h
@@ -262,7 +262,18 @@ enum mrec_ext_t
/** Append a record to an undo log page.
This is equivalent to the old MLOG_UNDO_INSERT record.
The current byte offset will be reset to FIL_PAGE_TYPE. */
- UNDO_APPEND= 3
+ UNDO_APPEND= 3,
+ /** Delete a record on a ROW_FORMAT=REDUNDANT page.
+ We point to the precedessor of the record to be deleted.
+ The current byte offset will be reset to FIL_PAGE_TYPE.
+ This is similar to the old MLOG_REC_DELETE record. */
+ DELETE_ROW_FORMAT_REDUNDANT= 8,
+ /** Delete a record on a ROW_FORMAT=COMPACT or DYNAMIC page.
+ We point to the precedessor of the record to be deleted
+ and include the total size of the record being deleted.
+ The current byte offset will be reset to FIL_PAGE_TYPE.
+ This is similar to the old MLOG_COMP_REC_DELETE record. */
+ DELETE_ROW_FORMAT_DYNAMIC= 9
};
diff --git a/storage/innobase/include/page0cur.h b/storage/innobase/include/page0cur.h
index 36a401cf0db..8387a409cde 100644
--- a/storage/innobase/include/page0cur.h
+++ b/storage/innobase/include/page0cur.h
@@ -201,6 +201,21 @@ page_cur_delete_rec(
mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull));
+/** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by
+page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page.
+@param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT
+@param prev byte offset of the predecessor, relative to PAGE_OLD_INFIMUM */
+void page_apply_delete_redundant(const buf_block_t &block, ulint prev);
+
+/** Apply a DELETE_ROW_FORMAT_DYNAMIC record that was written by
+page_cur_delete_rec() for a ROW_FORMAT=COMPACT or DYNAMIC page.
+@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
+@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
+@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
+@param data_size data payload size, in bytes */
+void page_apply_delete_dynamic(const buf_block_t &block, ulint prev,
+ size_t hdr_size, size_t data_size);
+
/** Search the right position for a page cursor.
@param[in] block buffer block
@param[in] index index tree
diff --git a/storage/innobase/include/page0page.h b/storage/innobase/include/page0page.h
index c3b80e3e196..ddd5d83892e 100644
--- a/storage/innobase/include/page0page.h
+++ b/storage/innobase/include/page0page.h
@@ -410,7 +410,7 @@ inline trx_id_t page_get_max_trx_id(const page_t *page)
Set the number of owned records.
@tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well
@param[in,out] block index page
-@param[in,out] rec ROW_FORMAT=REDUNDANT record
+@param[in,out] rec record in block.frame
@param[in] n_owned number of records skipped in the sparse page directory
@param[in] comp whether ROW_FORMAT is one of COMPACT,DYNAMIC,COMPRESSED
@param[in,out] mtr mini-transaction */
@@ -643,7 +643,7 @@ page_rec_check(
@return pointer to record */
inline rec_t *page_dir_slot_get_rec(page_dir_slot_t *slot)
{
- return page_align(slot) + mach_read_from_2(slot);
+ return page_align(slot) + mach_read_from_2(my_assume_aligned<2>(slot));
}
inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot)
{
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index 0912b169ec0..176b8c1d5d1 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -45,6 +45,7 @@ Created 9/20/1997 Heikki Tuuri
#include "mtr0mtr.h"
#include "mtr0log.h"
#include "page0page.h"
+#include "page0cur.h"
#include "trx0undo.h"
#include "ibuf0ibuf.h"
#include "trx0undo.h"
@@ -282,14 +283,14 @@ public:
goto next;
case EXTENDED:
if (UNIV_UNLIKELY(block.page.id.page_no() < 3 ||
- block.page.zip.ssize) &&
- !srv_force_recovery)
+ block.page.zip.ssize))
goto record_corrupted;
static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity");
static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility");
if (UNIV_UNLIKELY(!rlen))
goto record_corrupted;
switch (*l) {
+ uint8_t ll;
default:
goto record_corrupted;
case INIT_ROW_FORMAT_REDUNDANT:
@@ -308,6 +309,39 @@ public:
goto record_corrupted;
undo_append(block, ++l, --rlen);
break;
+ case DELETE_ROW_FORMAT_REDUNDANT:
+ if (UNIV_UNLIKELY(rlen < 2 || rlen > 4))
+ goto record_corrupted;
+ rlen--;
+ ll= mlog_decode_varint_length(*++l);
+ if (UNIV_UNLIKELY(ll != rlen))
+ goto record_corrupted;
+ page_apply_delete_redundant(block, mlog_decode_varint(l));
+ break;
+ case DELETE_ROW_FORMAT_DYNAMIC:
+ if (UNIV_UNLIKELY(rlen < 2))
+ goto record_corrupted;
+ rlen--;
+ ll= mlog_decode_varint_length(*++l);
+ if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
+ goto record_corrupted;
+ size_t prev_rec= mlog_decode_varint(l);
+ ut_ad(prev_rec != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ ll= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
+ goto record_corrupted;
+ size_t hdr_size= mlog_decode_varint(l);
+ ut_ad(hdr_size != MLOG_DECODE_ERROR);
+ rlen-= ll;
+ l+= ll;
+ ll= mlog_decode_varint_length(*l);
+ if (UNIV_UNLIKELY(ll > 3 || ll != rlen))
+ goto record_corrupted;
+ page_apply_delete_dynamic(block, prev_rec, hdr_size,
+ mlog_decode_varint(l));
+ break;
}
last_offset= FIL_PAGE_TYPE;
goto next_after_applying;
diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc
index d58c63a6c86..45cf5886302 100644
--- a/storage/innobase/page/page0cur.cc
+++ b/storage/innobase/page/page0cur.cc
@@ -785,17 +785,15 @@ page_cur_open_on_rnd_user_rec(
}
/**
-Set the owned records field of the record pointed to by a directory slot.
-@param[in,out] block file page
-@param[in] slot sparse directory slot
-@param[in,out] n number of records owned by the directory slot
-@param[in,out] mtr mini-transaction */
-static void page_dir_slot_set_n_owned(buf_block_t *block,
- const page_dir_slot_t *slot,
- ulint n, mtr_t *mtr)
+Set the number of owned records.
+@param[in,out] rec record in block.frame
+@param[in] n_owned number of records skipped in the sparse page directory
+@param[in] comp whether ROW_FORMAT is COMPACT or DYNAMIC */
+static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp)
{
- rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
- page_rec_set_n_owned<true>(block, rec, n, page_rec_is_comp(rec), mtr);
+ rec-= comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED;
+ *rec= static_cast<byte>((*rec & ~REC_N_OWNED_MASK) |
+ (n_owned << REC_N_OWNED_SHIFT));
}
/**
@@ -874,12 +872,13 @@ static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
Try to balance an underfilled directory slot with an adjacent one,
so that there are at least the minimum number of records owned by the slot;
this may result in merging the two slots.
-@param[in,out] block index page
+@param[in,out] block ROW_FORMAT=COMPRESSED page
@param[in] s the slot to be balanced
@param[in,out] mtr mini-transaction */
-static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
+static void page_zip_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
{
- ut_ad(!block->page.zip.data || page_is_comp(block->frame));
+ ut_ad(block->page.zip.data);
+ ut_ad(page_is_comp(block->frame));
ut_ad(s > 0);
const ulint n_slots = page_dir_get_n_slots(block->frame);
@@ -892,21 +891,23 @@ static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
ut_ad(s < n_slots);
page_dir_slot_t* slot = page_dir_get_nth_slot(block->frame, s);
- page_dir_slot_t* up_slot = slot - PAGE_DIR_SLOT_SIZE;
- const ulint up_n_owned = page_dir_slot_get_n_owned(up_slot);
+ rec_t* const up_rec = const_cast<rec_t*>
+ (page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE));
+ rec_t* const slot_rec = const_cast<rec_t*>
+ (page_dir_slot_get_rec(slot));
+ const ulint up_n_owned = rec_get_n_owned_new(up_rec);
- ut_ad(page_dir_slot_get_n_owned(slot)
+ ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(slot))
== PAGE_DIR_SLOT_MIN_N_OWNED - 1);
if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1
<= PAGE_DIR_SLOT_MAX_N_OWNED);
/* Merge the slots. */
- ulint n_owned = page_dir_slot_get_n_owned(slot);
- page_dir_slot_set_n_owned(block, slot, 0, mtr);
- page_dir_slot_set_n_owned(block, up_slot, n_owned
- + page_dir_slot_get_n_owned(up_slot),
- mtr);
+ page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr);
+ page_rec_set_n_owned<true>(block, up_rec, up_n_owned
+ + (PAGE_DIR_SLOT_MIN_N_OWNED - 1),
+ true, mtr);
/* Shift the slots */
page_dir_slot_t* last_slot = page_dir_get_nth_slot(
block->frame, n_slots - 1);
@@ -916,48 +917,92 @@ static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
byte *n_slots_p= my_assume_aligned<2>
(n_slots_f + block->frame);
mtr->write<2>(*block, n_slots_p, n_slots - 1);
+ memcpy_aligned<2>(n_slots_f + block->page.zip.data,
+ n_slots_p, 2);
+ memset_aligned<2>(last_slot, 0, 2);
+ return;
+ }
- if (UNIV_LIKELY_NULL(block->page.zip.data)) {
- memset_aligned<2>(last_slot, 0, 2);
- memcpy_aligned<2>(n_slots_f + block->page.zip.data,
- n_slots_p, 2);
- } else {
- mtr->memmove(*block, PAGE_DIR_SLOT_SIZE
- + page_offset(last_slot),
- page_offset(last_slot), slot - last_slot);
- mtr->write<2>(*block, last_slot, 0U);
- }
+ /* Transfer one record to the underfilled slot */
+ page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr);
+ rec_t* new_rec = rec_get_next_ptr(slot_rec, TRUE);
+ page_rec_set_n_owned<true>(block, new_rec,
+ PAGE_DIR_SLOT_MIN_N_OWNED,
+ true, mtr);
+ mach_write_to_2(slot, page_offset(new_rec));
+ page_rec_set_n_owned(up_rec, up_n_owned - 1, true);
+}
+
+/**
+Try to balance an underfilled directory slot with an adjacent one,
+so that there are at least the minimum number of records owned by the slot;
+this may result in merging the two slots.
+@param[in,out] block index page
+@param[in] s the slot to be balanced */
+static void page_dir_balance_slot(const buf_block_t &block, ulint s)
+{
+ const bool comp= page_is_comp(block.frame);
+ ut_ad(!block.page.zip.data);
+ ut_ad(s > 0);
+
+ const ulint n_slots = page_dir_get_n_slots(block.frame);
+
+ if (UNIV_UNLIKELY(s + 1 == n_slots)) {
+ /* The last directory slot cannot be balanced. */
+ return;
+ }
+
+ ut_ad(s < n_slots);
+
+ page_dir_slot_t* slot = page_dir_get_nth_slot(block.frame, s);
+ rec_t* const up_rec = const_cast<rec_t*>
+ (page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE));
+ rec_t* const slot_rec = const_cast<rec_t*>
+ (page_dir_slot_get_rec(slot));
+ const ulint up_n_owned = comp
+ ? rec_get_n_owned_new(up_rec)
+ : rec_get_n_owned_old(up_rec);
+
+ ut_ad(page_dir_slot_get_n_owned(slot)
+ == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
+ if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
+ compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1
+ <= PAGE_DIR_SLOT_MAX_N_OWNED);
+ /* Merge the slots. */
+ page_rec_set_n_owned(slot_rec, 0, comp);
+ page_rec_set_n_owned(up_rec, up_n_owned
+ + (PAGE_DIR_SLOT_MIN_N_OWNED - 1), comp);
+ /* Shift the slots */
+ page_dir_slot_t* last_slot = page_dir_get_nth_slot(
+ block.frame, n_slots - 1);
+ memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot,
+ slot - last_slot);
+ memset_aligned<2>(last_slot, 0, 2);
+ constexpr uint16_t n_slots_f = PAGE_N_DIR_SLOTS + PAGE_HEADER;
+ byte *n_slots_p= my_assume_aligned<2>
+ (n_slots_f + block.frame);
+ mach_write_to_2(n_slots_p, n_slots - 1);
return;
}
/* Transfer one record to the underfilled slot */
- rec_t* old_rec = const_cast<rec_t*>(page_dir_slot_get_rec(slot));
rec_t* new_rec;
- if (page_is_comp(block->frame)) {
- new_rec = rec_get_next_ptr(old_rec, TRUE);
-
- page_rec_set_n_owned<true>(block, old_rec, 0, true, mtr);
- page_rec_set_n_owned<true>(block, new_rec,
- PAGE_DIR_SLOT_MIN_N_OWNED,
- true, mtr);
- if (UNIV_LIKELY_NULL(block->page.zip.data)) {
- mach_write_to_2(slot, page_offset(new_rec));
- goto func_exit;
- }
+ if (comp) {
+ page_rec_set_n_owned(slot_rec, 0, true);
+ new_rec = rec_get_next_ptr(slot_rec, TRUE);
+ page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED, true);
+ page_rec_set_n_owned(up_rec, up_n_owned - 1, true);
} else {
- new_rec = rec_get_next_ptr(old_rec, FALSE);
-
- page_rec_set_n_owned<false>(block, old_rec, 0, false, mtr);
- page_rec_set_n_owned<false>(block, new_rec,
- PAGE_DIR_SLOT_MIN_N_OWNED,
- false, mtr);
+ page_rec_set_n_owned(slot_rec, 0, false);
+ new_rec = rec_get_next_ptr(slot_rec, FALSE);
+ page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED,
+ false);
+ page_rec_set_n_owned(up_rec, up_n_owned - 1, false);
}
- mtr->write<2>(*block, slot, page_offset(new_rec));
-func_exit:
- page_dir_slot_set_n_owned(block, up_slot, up_n_owned - 1, mtr);
+ mach_write_to_2(slot, page_offset(new_rec));
}
/** Allocate space for inserting an index record.
@@ -1766,111 +1811,77 @@ inc_dir:
return insert_rec;
}
-/** Prepend a record to the PAGE_FREE list.
-@param[in,out] block index page
-@param[in,out] rec record being deleted
-@param[in] index the index that the page belongs to
-@param[in] offsets rec_get_offsets(rec, index)
-@param[in,out] mtr mini-transaction */
-static void page_mem_free(buf_block_t *block, rec_t *rec,
- const dict_index_t *index, const offset_t *offsets,
- mtr_t *mtr)
+/** Prepend a record to the PAGE_FREE list, or shrink PAGE_HEAP_TOP.
+@param[in,out] block index page
+@param[in,out] rec record being deleted
+@param[in] data_size record payload size, in bytes
+@param[in] extra_size record header size, in bytes */
+static void page_mem_free(const buf_block_t &block, rec_t *rec,
+ size_t data_size, size_t extra_size)
{
- ut_ad(rec_offs_validate(rec, index, offsets));
- ut_ad(page_align(rec) == block->frame);
- const rec_t *free= page_header_get_ptr(block->frame, PAGE_FREE);
-
- if (UNIV_LIKELY_NULL(block->page.zip.data))
- {
- page_header_reset_last_insert(block, mtr);
- page_zip_dir_delete(block, rec, index, offsets, free, mtr);
- return;
- }
+ ut_ad(page_align(rec) == block.frame);
+ ut_ad(!block.page.zip.data);
+ const rec_t *free= page_header_get_ptr(block.frame, PAGE_FREE);
- const uint16_t n_heap= page_header_get_field(block->frame, PAGE_N_HEAP) - 1;
- ut_ad(page_get_n_recs(block->frame) < (n_heap & 0x7fff));
- alignas(4) byte page_header[6];
- const bool deleting_last= n_heap == ((n_heap & 0x8000)
- ? (rec_get_heap_no_new(rec) | 0x8000)
- : rec_get_heap_no_old(rec));
+ const uint16_t n_heap= page_header_get_field(block.frame, PAGE_N_HEAP) - 1;
+ ut_ad(page_get_n_recs(block.frame) < (n_heap & 0x7fff));
+ const bool deleting_top= n_heap == ((n_heap & 0x8000)
+ ? (rec_get_heap_no_new(rec) | 0x8000)
+ : rec_get_heap_no_old(rec));
- if (deleting_last)
+ if (deleting_top)
{
- const uint16_t heap_top= page_header_get_offs(block->frame, PAGE_HEAP_TOP);
- const size_t extra_savings= heap_top -
- page_offset(rec_get_end(rec, offsets));
+ byte *page_heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER +
+ block.frame);
+ const uint16_t heap_top= mach_read_from_2(page_heap_top);
+ const size_t extra_savings= heap_top - page_offset(rec + data_size);
ut_ad(extra_savings < heap_top);
/* When deleting the last record, do not add it to the PAGE_FREE list.
Instead, decrement PAGE_HEAP_TOP and PAGE_N_HEAP. */
- mach_write_to_2(page_header, page_offset(rec_get_start(rec, offsets)));
- mach_write_to_2(my_assume_aligned<2>(page_header + 2), n_heap);
+ mach_write_to_2(page_heap_top, page_offset(rec - extra_size));
+ mach_write_to_2(my_assume_aligned<2>(page_heap_top + 2), n_heap);
static_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2, "compatibility");
- mtr->memcpy(*block, my_assume_aligned<4>(PAGE_HEAP_TOP + PAGE_HEADER +
- block->frame), page_header, 4);
if (extra_savings)
{
- uint16_t garbage= page_header_get_field(block->frame, PAGE_GARBAGE);
- mach_write_to_2(page_header, garbage - extra_savings);
- size_t len= 2;
- if (page_header_get_field(block->frame, PAGE_LAST_INSERT))
- {
- memset_aligned<2>(page_header + 2, 0, 2);
- len= 4;
- }
- mtr->memcpy(*block, my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
- block->frame),
- page_header, len);
+ byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
+ block.frame);
+ uint16_t garbage= mach_read_from_2(page_garbage);
+ ut_ad(garbage >= extra_savings);
+ mach_write_to_2(page_garbage, garbage - extra_savings);
}
- else
- mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2>
- (PAGE_LAST_INSERT + PAGE_HEADER + block->frame),
- 0U);
}
else
{
- mach_write_to_2(page_header, page_offset(rec));
- mach_write_to_2(my_assume_aligned<2>(page_header + 2),
- rec_offs_size(offsets) +
- page_header_get_field(block->frame, PAGE_GARBAGE));
- static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility");
- static_assert(PAGE_FREE + 4 == PAGE_LAST_INSERT, "compatibility");
- size_t size;
- if (page_header_get_field(block->frame, PAGE_LAST_INSERT))
- {
- memset_aligned<2>(page_header + 4, 0, 2);
- size= 6;
- }
- else
- size= 4;
- mtr->memcpy(*block, my_assume_aligned<4>(PAGE_FREE + PAGE_HEADER +
- block->frame), page_header, size);
+ byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
+ block.frame);
+ byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
+ block.frame);
+ mach_write_to_2(page_free, page_offset(rec));
+ mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) +
+ extra_size + data_size);
}
- mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame,
- ulint(page_get_n_recs(block->frame)) - 1);
+ memset_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER + block.frame, 0, 2);
+ byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
+ block.frame);
+ mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) - 1);
+
+ const byte* const end= rec + data_size;
- if (!deleting_last)
+ if (!deleting_top)
{
uint16_t next= free
? ((n_heap & 0x8000)
? static_cast<uint16_t>(free - rec)
- : static_cast<uint16_t>(page_offset(free)))
+ : static_cast<uint16_t>(free - block.frame))
: 0;
- mtr->write<2>(*block, rec - REC_NEXT, next);
+ mach_write_to_2(rec - REC_NEXT, next);
}
+ else
+ rec-= extra_size;
- if (srv_immediate_scrub_data_uncompressed)
- {
- size_t size= rec_offs_data_size(offsets);
- if (deleting_last)
- {
- const size_t extra_size= rec_offs_extra_size(offsets);
- rec-= extra_size;
- size+= extra_size;
- }
- mtr->memset(block, page_offset(rec), size, 0);
- }
+ memset(rec, 0, end - rec);
}
/***********************************************************//**
@@ -1886,7 +1897,6 @@ page_cur_delete_rec(
mtr_t* mtr) /*!< in/out: mini-transaction */
{
page_dir_slot_t* cur_dir_slot;
- page_dir_slot_t* prev_slot;
rec_t* current_rec;
rec_t* prev_rec = NULL;
rec_t* next_rec;
@@ -1946,10 +1956,8 @@ page_cur_delete_rec(
/* Find the next and the previous record. Note that the cursor is
left at the next record. */
- ut_ad(cur_slot_no > 0);
- prev_slot = page_dir_get_nth_slot(block->frame, cur_slot_no - 1);
-
- rec = const_cast<rec_t*>(page_dir_slot_get_rec(prev_slot));
+ rec = const_cast<rec_t*>
+ (page_dir_slot_get_rec(cur_dir_slot + PAGE_DIR_SLOT_SIZE));
/* rec now points to the record of the previous directory slot. Look
for the immediate predecessor of current_rec in a loop. */
@@ -1989,47 +1997,243 @@ page_cur_delete_rec(
mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
(next_rec - prev_rec));
- mach_write_to_1(slot_rec - REC_NEW_N_OWNED,
- (slot_rec[-REC_NEW_N_OWNED]
- & ~REC_N_OWNED_MASK)
- | (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
- } else {
- if (current_rec == slot_rec) {
- slot_rec = prev_rec;
- mtr->write<2>(*block, cur_dir_slot,
- page_offset(slot_rec));
+ slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>(
+ (slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK)
+ | (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
+
+ page_header_reset_last_insert(block, mtr);
+ page_zip_dir_delete(block, rec, index, offsets,
+ page_header_get_ptr(block->frame,
+ PAGE_FREE),
+ mtr);
+ if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
+ page_zip_dir_balance_slot(block, cur_slot_no, mtr);
}
+ return;
+ }
- if (page_is_comp(block->frame)) {
- mtr->write<2>(*block, prev_rec - REC_NEXT,
- static_cast<uint16_t>
- (next_rec - prev_rec));
- mtr->write<1>(*block, slot_rec - REC_NEW_N_OWNED,
- (slot_rec[-REC_NEW_N_OWNED]
- & ~REC_N_OWNED_MASK)
- | (cur_n_owned - 1)
- << REC_N_OWNED_SHIFT);
- } else {
- mtr->write<2>(*block, prev_rec - REC_NEXT,
- page_offset(next_rec));
- mtr->write<1>(*block, slot_rec - REC_OLD_N_OWNED,
- (slot_rec[-REC_OLD_N_OWNED]
- & ~REC_N_OWNED_MASK)
- | (cur_n_owned - 1)
- << REC_N_OWNED_SHIFT);
- }
+ if (current_rec == slot_rec) {
+ slot_rec = prev_rec;
+ mach_write_to_2(cur_dir_slot, page_offset(slot_rec));
+ }
+
+ const size_t data_size = rec_offs_data_size(offsets);
+ const size_t extra_size = rec_offs_extra_size(offsets);
+
+ if (page_is_comp(block->frame)) {
+ mtr->page_delete(*block, page_offset(prev_rec)
+ - PAGE_NEW_INFIMUM,
+ extra_size - REC_N_NEW_EXTRA_BYTES,
+ data_size);
+ mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
+ (next_rec - prev_rec));
+ slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>(
+ (slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK)
+ | (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
+ } else {
+ mtr->page_delete(*block, page_offset(prev_rec)
+ - PAGE_OLD_INFIMUM);
+ memcpy(prev_rec - REC_NEXT, current_rec - REC_NEXT, 2);
+ slot_rec[-REC_OLD_N_OWNED] = static_cast<byte>(
+ (slot_rec[-REC_OLD_N_OWNED] & ~REC_N_OWNED_MASK)
+ | (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
}
- /* Free the memory occupied by the record */
- page_mem_free(block, current_rec, index, offsets, mtr);
+ page_mem_free(*block, current_rec, data_size, extra_size);
/* Now we have decremented the number of owned records of the slot.
If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
slots. */
if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
- page_dir_balance_slot(block, cur_slot_no, mtr);
+ page_dir_balance_slot(*block, cur_slot_no);
}
+
+ ut_ad(page_is_comp(block->frame)
+ ? page_simple_validate_new(block->frame)
+ : page_simple_validate_old(block->frame));
+}
+
+/** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by
+page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page.
+@param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT
+@param prev byte offset of the predecessor, relative to PAGE_OLD_INFIMUM */
+void page_apply_delete_redundant(const buf_block_t &block, ulint prev)
+{
+ const uint16_t n_slots= page_dir_get_n_slots(block.frame);
+ ulint n_recs= page_get_n_recs(block.frame);
+
+ if (UNIV_UNLIKELY(!n_recs || n_slots < 2 ||
+ !fil_page_index_page_check(block.frame) ||
+ page_get_page_no(block.frame) != block.page.id.page_no() ||
+ mach_read_from_2(my_assume_aligned<2>
+ (PAGE_OLD_SUPREMUM - REC_NEXT +
+ block.frame)) ||
+ page_is_comp(block.frame)))
+ {
+corrupted:
+ ib::error() << "Not applying DELETE_ROW_FORMAT_REDUNDANT"
+ " due to corruption on " << block.page.id;
+ return;
+ }
+
+ byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
+ rec_t *prev_rec= block.frame + PAGE_OLD_INFIMUM + prev;
+ if (UNIV_UNLIKELY(prev_rec > slot))
+ goto corrupted;
+ uint16_t n= mach_read_from_2(prev_rec - REC_NEXT);
+ rec_t *rec= block.frame + n;
+ if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
+ slot < rec))
+ goto corrupted;
+ const ulint extra_size= REC_N_OLD_EXTRA_BYTES + rec_get_n_fields_old(rec) *
+ (rec_get_1byte_offs_flag(rec) ? 1 : 2);
+ const ulint data_size= rec_get_data_size_old(rec);
+ if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + extra_size ||
+ slot < rec + data_size))
+ goto corrupted;
+
+ n= mach_read_from_2(rec - REC_NEXT);
+ rec_t *next= block.frame + n;
+ if (n == PAGE_OLD_SUPREMUM);
+ else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
+ slot < next))
+ goto corrupted;
+
+ rec_t *s= rec;
+ ulint slot_owned;
+ for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_old(s)); )
+ {
+ n= mach_read_from_2(s - REC_NEXT);
+ s= block.frame + n;
+ if (n == PAGE_OLD_SUPREMUM);
+ else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
+ slot < s))
+ goto corrupted;
+ if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */
+ goto corrupted;
+ }
+ slot_owned--;
+
+ /* The first slot is always pointing to the infimum record.
+ Find the directory slot pointing to s. */
+ const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2);
+ alignas(2) byte slot_offs[2];
+ mach_write_to_2(slot_offs, s - block.frame);
+ static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
+
+ while (memcmp_aligned<2>(slot, slot_offs, 2))
+ if ((slot+= 2) == first_slot)
+ goto corrupted;
+
+ if (rec == s)
+ {
+ s= prev_rec;
+ mach_write_to_2(slot, s - block.frame);
+ }
+
+ memcpy(prev_rec - REC_NEXT, rec - REC_NEXT, 2);
+ s-= REC_OLD_N_OWNED;
+ *s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) |
+ slot_owned << REC_N_OWNED_SHIFT);
+ page_mem_free(block, rec, data_size, extra_size);
+
+ if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED)
+ page_dir_balance_slot(block, (first_slot - slot) / 2);
+
+ ut_ad(page_simple_validate_old(block.frame));
+}
+
+/** Apply a DELETE_ROW_FORMAT_DYNAMIC record that was written by
+page_cur_delete_rec() for a ROW_FORMAT=COMPACT or DYNAMIC page.
+@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
+@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
+@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
+@param data_size data payload size, in bytes */
+void page_apply_delete_dynamic(const buf_block_t &block, ulint prev,
+ size_t hdr_size, size_t data_size)
+{
+ const uint16_t n_slots= page_dir_get_n_slots(block.frame);
+ ulint n_recs= page_get_n_recs(block.frame);
+
+ if (UNIV_UNLIKELY(!n_recs || n_slots < 2 ||
+ !fil_page_index_page_check(block.frame) ||
+ page_get_page_no(block.frame) != block.page.id.page_no() ||
+ mach_read_from_2(my_assume_aligned<2>
+ (PAGE_NEW_SUPREMUM - REC_NEXT +
+ block.frame)) ||
+ !page_is_comp(block.frame)))
+ {
+corrupted:
+ ib::error() << "Not applying DELETE_ROW_FORMAT_DYNAMIC"
+ " due to corruption on " << block.page.id;
+ return;
+ }
+
+ byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
+ uint16_t n= static_cast<uint16_t>(PAGE_NEW_INFIMUM + prev);
+ rec_t *prev_rec= block.frame + n;
+ if (UNIV_UNLIKELY(prev_rec > slot))
+ goto corrupted;
+ n+= mach_read_from_2(prev_rec - REC_NEXT);
+ rec_t *rec= block.frame + n;
+ if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
+ slot < rec))
+ goto corrupted;
+ const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_size;
+ if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + extra_size ||
+ slot < rec + data_size))
+ goto corrupted;
+ n+= mach_read_from_2(rec - REC_NEXT);
+ rec_t *next= block.frame + n;
+ if (n == PAGE_NEW_SUPREMUM);
+ else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
+ slot < next))
+ goto corrupted;
+
+ rec_t *s= rec;
+ n= static_cast<uint16_t>(rec - block.frame);
+ ulint slot_owned;
+ for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_new(s)); )
+ {
+ n+= mach_read_from_2(s - REC_NEXT);
+ s= block.frame + n;
+ if (n == PAGE_NEW_SUPREMUM);
+ else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
+ slot < s))
+ goto corrupted;
+ if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */
+ goto corrupted;
+ }
+ slot_owned--;
+
+ /* The first slot is always pointing to the infimum record.
+ Find the directory slot pointing to s. */
+ const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2);
+ alignas(2) byte slot_offs[2];
+ mach_write_to_2(slot_offs, s - block.frame);
+ static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
+
+ while (memcmp_aligned<2>(slot, slot_offs, 2))
+ if ((slot+= 2) == first_slot)
+ goto corrupted;
+
+ if (rec == s)
+ {
+ s= prev_rec;
+ mach_write_to_2(slot, s - block.frame);
+ }
+
+ mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>(next - prev_rec));
+ s-= REC_NEW_N_OWNED;
+ *s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) |
+ slot_owned << REC_N_OWNED_SHIFT);
+ page_mem_free(block, rec, data_size, extra_size);
+
+ if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED)
+ page_dir_balance_slot(block, (first_slot - slot) / 2);
+
+ ut_ad(page_simple_validate_new(block.frame));
}
#ifdef UNIV_COMPILE_TEST_FUNCS