summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2020-02-17 13:30:01 +0200
committerMarko Mäkelä <marko.makela@mariadb.com>2020-02-17 15:32:24 +0200
commit22f649a67a26d98ed46b57ebe42799f4717e27bf (patch)
tree44fd2a2d40770dd9a28d014873a141db00452341
parent09feb176e94f1cbdb213e8824abc0baa903921a0 (diff)
downloadmariadb-git-22f649a67a26d98ed46b57ebe42799f4717e27bf.tar.gz
MDEV-12353: Reformat page_delete_rec_list_end()
We add FIXME comments and some sketch code for the following cases: It is possible to write considerably less log for ROW_FORMAT=COMPRESSED pages. For now, we will delete the records one by one. It is also possible to treat 'deleting the last records' as a special case that would involve shrinking PAGE_HEAP_TOP. That should reduce the need of reorganizing pages.
-rw-r--r--storage/innobase/page/page0page.cc382
1 files changed, 189 insertions, 193 deletions
diff --git a/storage/innobase/page/page0page.cc b/storage/innobase/page/page0page.cc
index bc2a4ad0c34..8947a449ab4 100644
--- a/storage/innobase/page/page0page.cc
+++ b/storage/innobase/page/page0page.cc
@@ -842,206 +842,202 @@ page_delete_rec_list_end(
delete, or ULINT_UNDEFINED if not known */
mtr_t* mtr) /*!< in: mtr */
{
- ulint slot_index;
- rec_t* last_rec;
- rec_t* prev_rec;
- ulint n_owned;
- page_zip_des_t* page_zip = buf_block_get_page_zip(block);
- mem_heap_t* heap = NULL;
- offset_t offsets_[REC_OFFS_NORMAL_SIZE];
- offset_t* offsets = offsets_;
- rec_offs_init(offsets_);
-
- ut_ad(size == ULINT_UNDEFINED || size < srv_page_size);
- ut_ad(!page_zip || page_rec_is_comp(rec));
- ut_ad(page_align(rec) == block->frame);
+ ut_ad(size == ULINT_UNDEFINED || size < srv_page_size);
+ ut_ad(page_align(rec) == block->frame);
+ ut_ad(index->table->not_redundant() == !!page_is_comp(block->frame));
#ifdef UNIV_ZIP_DEBUG
- ut_a(!page_zip || page_zip_validate(page_zip, block->frame, index));
+ ut_a(!block->page.zip.data ||
+ page_zip_validate(block->page.zip, block->frame, index));
#endif /* UNIV_ZIP_DEBUG */
- if (page_rec_is_supremum(rec)) {
- ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED);
- /* Nothing to do, there are no records bigger than the
- page supremum. */
- return;
- }
-
- if (page_rec_is_infimum(rec)
- || n_recs == page_get_n_recs(block->frame)) {
-delete_all:
- /* We are deleting all records. */
- page_create_empty(block, index, mtr);
- return;
- } else if (page_is_comp(block->frame)) {
- if (page_rec_get_next_low(block->frame + PAGE_NEW_INFIMUM, 1)
- == rec) {
- /* We are deleting everything from the first
- user record onwards. */
- goto delete_all;
- }
- } else {
- if (page_rec_get_next_low(block->frame + PAGE_OLD_INFIMUM, 0)
- == rec) {
- /* We are deleting everything from the first
- user record onwards. */
- goto delete_all;
- }
- }
-
- /* The page gets invalid for optimistic searches: increment the
- frame modify clock */
-
- buf_block_modify_clock_inc(block);
-
- const bool is_leaf = page_is_leaf(block->frame);
- mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2>
- (PAGE_LAST_INSERT + PAGE_HEADER
- + block->frame), 0U);
-
- if (UNIV_LIKELY_NULL(page_zip)) {
- ut_ad(page_is_comp(block->frame));
-
- memset_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER
- + page_zip->data, 0, 2);
+ if (page_rec_is_supremum(rec))
+ {
+ ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED);
+ /* Nothing to do, there are no records bigger than the page supremum. */
+ return;
+ }
+
+ if (page_rec_is_infimum(rec) || n_recs == page_get_n_recs(block->frame) ||
+ rec == (page_is_comp(block->frame)
+ ? page_rec_get_next_low(block->frame + PAGE_NEW_INFIMUM, 1)
+ : page_rec_get_next_low(block->frame + PAGE_OLD_INFIMUM, 0)))
+ {
+ /* We are deleting all records. */
+ page_create_empty(block, index, mtr);
+ return;
+ }
+
+#if 0 // FIXME: consider deleting the last record as a special case
+ if (page_rec_is_last(rec))
+ {
+ page_cur_t cursor= { index, rec, offsets, block };
+ page_cur_delete_rec(&cursor, index, offsets, mtr);
+ return;
+ }
+#endif
+
+ /* The page gets invalid for optimistic searches */
+ buf_block_modify_clock_inc(block);
- do {
- page_cur_t cur;
- page_cur_position(rec, block, &cur);
+ const bool is_leaf= page_is_leaf(block->frame);
+ mem_heap_t *heap= nullptr;
+ offset_t offsets_[REC_OFFS_NORMAL_SIZE];
+ offset_t *offsets= offsets_;
+ rec_offs_init(offsets_);
- offsets = rec_get_offsets(rec, index, offsets, is_leaf,
- ULINT_UNDEFINED, &heap);
- rec = rec_get_next_ptr(rec, TRUE);
+#if 1 // FIXME: remove this, and write minimal amount of log! */
+ if (UNIV_LIKELY_NULL(block->page.zip.data))
+ {
+ ut_ad(page_is_comp(block->frame));
+ do
+ {
+ page_cur_t cur;
+ page_cur_position(rec, block, &cur);
+ offsets= rec_get_offsets(rec, index, offsets, is_leaf,
+ ULINT_UNDEFINED, &heap);
+ rec= rec_get_next_ptr(rec, TRUE);
#ifdef UNIV_ZIP_DEBUG
- ut_a(page_zip_validate(page_zip, block->frame, index));
+ ut_a(page_zip_validate(block->page.zip.data, block->frame, index));
#endif /* UNIV_ZIP_DEBUG */
- page_cur_delete_rec(&cur, index, offsets, mtr);
- } while (page_offset(rec) != PAGE_NEW_SUPREMUM);
-
- if (UNIV_LIKELY_NULL(heap)) {
- mem_heap_free(heap);
- }
-
- return;
- }
-
- prev_rec = page_rec_get_prev(rec);
-
- last_rec = page_rec_get_prev(page_get_supremum_rec(block->frame));
-
- const bool scrub = srv_immediate_scrub_data_uncompressed;
- if (scrub || size == ULINT_UNDEFINED || n_recs == ULINT_UNDEFINED) {
- rec_t* rec2 = rec;
- /* Calculate the sum of sizes and the number of records */
- size = 0;
- n_recs = 0;
-
- do {
- ulint s;
- offsets = rec_get_offsets(rec2, index, offsets,
- is_leaf,
- ULINT_UNDEFINED, &heap);
- s = rec_offs_size(offsets);
- ut_ad(ulint(rec2 - block->frame) + s
- - rec_offs_extra_size(offsets)
- < srv_page_size);
- ut_ad(size + s < srv_page_size);
- size += s;
- n_recs++;
-
- if (scrub) {
- /* scrub record */
- mtr->memset(block, page_offset(rec2),
- rec_offs_data_size(offsets), 0);
- }
-
- rec2 = page_rec_get_next(rec2);
- } while (!page_rec_is_supremum(rec2));
-
- if (UNIV_LIKELY_NULL(heap)) {
- mem_heap_free(heap);
- }
- }
-
- ut_ad(size < srv_page_size);
-
- {
- rec_t* rec2 = rec;
- ulint count = 0;
-
- if (page_is_comp(block->frame)) {
- while (!rec_get_n_owned_new(rec2)) {
- count++;
- rec2 = rec_get_next_ptr(rec2, TRUE);
- }
-
- ut_ad(rec_get_n_owned_new(rec2) > count);
-
- n_owned = rec_get_n_owned_new(rec2) - count;
- } else {
- while (!rec_get_n_owned_old(rec2)) {
- count++;
- rec2 = rec_get_next_ptr(rec2, FALSE);
- }
-
- ut_ad(rec_get_n_owned_old(rec2) > count);
-
- n_owned = rec_get_n_owned_old(rec2) - count;
- }
-
- slot_index = page_dir_find_owner_slot(rec2);
- ut_ad(slot_index > 0);
- }
-
- mtr->write<2,mtr_t::OPT>(*block, PAGE_N_DIR_SLOTS + PAGE_HEADER
- + block->frame, slot_index + 1);
-
- /* Catenate the deleted chain segment to the page free list */
- alignas(4) byte page_header[4];
- byte* page_free = my_assume_aligned<4>(PAGE_HEADER + PAGE_FREE
- + block->frame);
- const uint16_t free = page_header_get_field(block->frame, PAGE_FREE);
- static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility");
-
- mach_write_to_2(page_header, page_offset(rec));
- mach_write_to_2(my_assume_aligned<2>(page_header + 2),
- mach_read_from_2(my_assume_aligned<2>(page_free + 2))
- + size);
- mtr->memcpy(*block, page_free, page_header, 4);
-
- byte* page_n_recs = my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER
- + block->frame);
- mtr->write<2>(*block, page_n_recs,
- ulint{mach_read_from_2(page_n_recs)} - n_recs);
-
- /* Update the page directory; there is no need to balance the number
- of the records owned by the supremum record, as it is allowed to be
- less than PAGE_DIR_SLOT_MIN_N_OWNED */
- page_dir_slot_t*slot = page_dir_get_nth_slot(block->frame, slot_index);
-
- if (page_is_comp(block->frame)) {
- mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_NEW_SUPREMUM);
- byte* owned = PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED
- + block->frame;
- byte new_owned = (*owned & ~REC_N_OWNED_MASK)
- | static_cast<byte>(n_owned << REC_N_OWNED_SHIFT);
- mtr->write<1,mtr_t::OPT>(*block, owned, new_owned);
- mtr->write<2>(*block, prev_rec - REC_NEXT,
- static_cast<uint16_t>
- (PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
- mtr->write<2>(*block, last_rec - REC_NEXT, free
- ? static_cast<uint16_t>
- (free - page_offset(last_rec))
- : 0U);
- } else {
- mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_OLD_SUPREMUM);
- byte* owned = PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED
- + block->frame;
- byte new_owned = (*owned & ~REC_N_OWNED_MASK)
- | static_cast<byte>(n_owned << REC_N_OWNED_SHIFT);
- mtr->write<1,mtr_t::OPT>(*block, owned, new_owned);
- mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM);
- mtr->write<2>(*block, last_rec - REC_NEXT, free);
- }
+ page_cur_delete_rec(&cur, index, offsets, mtr);
+ }
+ while (page_offset(rec) != PAGE_NEW_SUPREMUM);
+
+ if (UNIV_LIKELY_NULL(heap))
+ mem_heap_free(heap);
+ return;
+ }
+#endif
+
+ byte *prev_rec= page_rec_get_prev(rec);
+ byte *last_rec= page_rec_get_prev(page_get_supremum_rec(block->frame));
+
+ // FIXME: consider a special case of shrinking PAGE_HEAP_TOP
+
+ const bool scrub= srv_immediate_scrub_data_uncompressed;
+ if (scrub || size == ULINT_UNDEFINED || n_recs == ULINT_UNDEFINED)
+ {
+ rec_t *rec2= rec;
+ /* Calculate the sum of sizes and the number of records */
+ size= 0;
+ n_recs= 0;
+
+ do
+ {
+ offsets = rec_get_offsets(rec2, index, offsets, is_leaf,
+ ULINT_UNDEFINED, &heap);
+ ulint s= rec_offs_size(offsets);
+ ut_ad(ulint(rec2 - block->frame) + s - rec_offs_extra_size(offsets) <
+ srv_page_size);
+ ut_ad(size + s < srv_page_size);
+ size+= s;
+ n_recs++;
+
+ if (scrub)
+ mtr->memset(block, page_offset(rec2), rec_offs_data_size(offsets), 0);
+
+ rec2 = page_rec_get_next(rec2);
+ }
+ while (!page_rec_is_supremum(rec2));
+
+ if (UNIV_LIKELY_NULL(heap))
+ mem_heap_free(heap);
+ }
+
+ ut_ad(size < srv_page_size);
+
+ ulint slot_index, n_owned;
+ {
+ const rec_t *owner_rec= rec;
+ ulint count= 0;
+
+ if (page_is_comp(block->frame))
+ while (!(n_owned= rec_get_n_owned_new(owner_rec)))
+ {
+ count++;
+ owner_rec= rec_get_next_ptr_const(owner_rec, TRUE);
+ }
+ else
+ while (!(n_owned= rec_get_n_owned_old(owner_rec)))
+ {
+ count++;
+ owner_rec= rec_get_next_ptr_const(owner_rec, FALSE);
+ }
+
+ ut_ad(n_owned > count);
+ n_owned-= count;
+ slot_index= page_dir_find_owner_slot(owner_rec);
+ ut_ad(slot_index > 0);
+ }
+
+ mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2>
+ (PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame),
+ slot_index + 1);
+ mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2>
+ (PAGE_LAST_INSERT + PAGE_HEADER + block->frame),
+ 0U);
+ /* Catenate the deleted chain segment to the page free list */
+ alignas(4) byte page_header[4];
+ byte *page_free= my_assume_aligned<4>(PAGE_HEADER + PAGE_FREE +
+ block->frame);
+ const uint16_t free= page_header_get_field(block->frame, PAGE_FREE);
+ static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility");
+
+ mach_write_to_2(page_header, page_offset(rec));
+ mach_write_to_2(my_assume_aligned<2>(page_header + 2),
+ mach_read_from_2(my_assume_aligned<2>(page_free + 2)) +
+ size);
+ mtr->memcpy(*block, page_free, page_header, 4);
+
+ byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
+ block->frame);
+ mtr->write<2>(*block, page_n_recs,
+ ulint{mach_read_from_2(page_n_recs)} - n_recs);
+
+ /* Update the page directory; there is no need to balance the number
+ of the records owned by the supremum record, as it is allowed to be
+ less than PAGE_DIR_SLOT_MIN_N_OWNED */
+ page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, slot_index);
+
+ if (page_is_comp(block->frame))
+ {
+ mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_NEW_SUPREMUM);
+ byte *owned= PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED + block->frame;
+ byte new_owned= (*owned & ~REC_N_OWNED_MASK) |
+ static_cast<byte>(n_owned << REC_N_OWNED_SHIFT);
+#if 0 // FIXME: implement minimal logging for ROW_FORMAT=COMPRESSED
+ if (UNIV_LIKELY_NULL(block->page.zip.data))
+ {
+ *owned= new_owned;
+ memcpy_aligned<2>(PAGE_N_DIR_SLOTS + PAGE_HEADER + block->page.zip.data,
+ PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame,
+ PAGE_N_RECS + 2 - PAGE_N_DIR_SLOTS);
+ // TODO: the equivalent of page_zip_dir_delete() for all records
+ mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
+ (PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
+ mach_write_to_2(last_rec - REC_NEXT, free
+ ? static_cast<uint16_t>(free - page_offset(last_rec))
+ : 0U);
+ return;
+ }
+#endif
+ mtr->write<1,mtr_t::OPT>(*block, owned, new_owned);
+ mtr->write<2>(*block, prev_rec - REC_NEXT, static_cast<uint16_t>
+ (PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
+ mtr->write<2>(*block, last_rec - REC_NEXT, free
+ ? static_cast<uint16_t>(free - page_offset(last_rec))
+ : 0U);
+ }
+ else
+ {
+ mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_OLD_SUPREMUM);
+ byte *owned= PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED + block->frame;
+ byte new_owned= (*owned & ~REC_N_OWNED_MASK) |
+ static_cast<byte>(n_owned << REC_N_OWNED_SHIFT);
+ mtr->write<1,mtr_t::OPT>(*block, owned, new_owned);
+ mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM);
+ mtr->write<2>(*block, last_rec - REC_NEXT, free);
+ }
}
/*************************************************************//**