diff options
-rw-r--r-- | storage/innobase/page/page0page.cc | 382 |
1 files changed, 189 insertions, 193 deletions
diff --git a/storage/innobase/page/page0page.cc b/storage/innobase/page/page0page.cc index bc2a4ad0c34..8947a449ab4 100644 --- a/storage/innobase/page/page0page.cc +++ b/storage/innobase/page/page0page.cc @@ -842,206 +842,202 @@ page_delete_rec_list_end( delete, or ULINT_UNDEFINED if not known */ mtr_t* mtr) /*!< in: mtr */ { - ulint slot_index; - rec_t* last_rec; - rec_t* prev_rec; - ulint n_owned; - page_zip_des_t* page_zip = buf_block_get_page_zip(block); - mem_heap_t* heap = NULL; - offset_t offsets_[REC_OFFS_NORMAL_SIZE]; - offset_t* offsets = offsets_; - rec_offs_init(offsets_); - - ut_ad(size == ULINT_UNDEFINED || size < srv_page_size); - ut_ad(!page_zip || page_rec_is_comp(rec)); - ut_ad(page_align(rec) == block->frame); + ut_ad(size == ULINT_UNDEFINED || size < srv_page_size); + ut_ad(page_align(rec) == block->frame); + ut_ad(index->table->not_redundant() == !!page_is_comp(block->frame)); #ifdef UNIV_ZIP_DEBUG - ut_a(!page_zip || page_zip_validate(page_zip, block->frame, index)); + ut_a(!block->page.zip.data || + page_zip_validate(block->page.zip, block->frame, index)); #endif /* UNIV_ZIP_DEBUG */ - if (page_rec_is_supremum(rec)) { - ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED); - /* Nothing to do, there are no records bigger than the - page supremum. */ - return; - } - - if (page_rec_is_infimum(rec) - || n_recs == page_get_n_recs(block->frame)) { -delete_all: - /* We are deleting all records. */ - page_create_empty(block, index, mtr); - return; - } else if (page_is_comp(block->frame)) { - if (page_rec_get_next_low(block->frame + PAGE_NEW_INFIMUM, 1) - == rec) { - /* We are deleting everything from the first - user record onwards. */ - goto delete_all; - } - } else { - if (page_rec_get_next_low(block->frame + PAGE_OLD_INFIMUM, 0) - == rec) { - /* We are deleting everything from the first - user record onwards. */ - goto delete_all; - } - } - - /* The page gets invalid for optimistic searches: increment the - frame modify clock */ - - buf_block_modify_clock_inc(block); - - const bool is_leaf = page_is_leaf(block->frame); - mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2> - (PAGE_LAST_INSERT + PAGE_HEADER - + block->frame), 0U); - - if (UNIV_LIKELY_NULL(page_zip)) { - ut_ad(page_is_comp(block->frame)); - - memset_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER - + page_zip->data, 0, 2); + if (page_rec_is_supremum(rec)) + { + ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED); + /* Nothing to do, there are no records bigger than the page supremum. */ + return; + } + + if (page_rec_is_infimum(rec) || n_recs == page_get_n_recs(block->frame) || + rec == (page_is_comp(block->frame) + ? page_rec_get_next_low(block->frame + PAGE_NEW_INFIMUM, 1) + : page_rec_get_next_low(block->frame + PAGE_OLD_INFIMUM, 0))) + { + /* We are deleting all records. */ + page_create_empty(block, index, mtr); + return; + } + +#if 0 // FIXME: consider deleting the last record as a special case + if (page_rec_is_last(rec)) + { + page_cur_t cursor= { index, rec, offsets, block }; + page_cur_delete_rec(&cursor, index, offsets, mtr); + return; + } +#endif + + /* The page gets invalid for optimistic searches */ + buf_block_modify_clock_inc(block); - do { - page_cur_t cur; - page_cur_position(rec, block, &cur); + const bool is_leaf= page_is_leaf(block->frame); + mem_heap_t *heap= nullptr; + offset_t offsets_[REC_OFFS_NORMAL_SIZE]; + offset_t *offsets= offsets_; + rec_offs_init(offsets_); - offsets = rec_get_offsets(rec, index, offsets, is_leaf, - ULINT_UNDEFINED, &heap); - rec = rec_get_next_ptr(rec, TRUE); +#if 1 // FIXME: remove this, and write minimal amount of log! */ + if (UNIV_LIKELY_NULL(block->page.zip.data)) + { + ut_ad(page_is_comp(block->frame)); + do + { + page_cur_t cur; + page_cur_position(rec, block, &cur); + offsets= rec_get_offsets(rec, index, offsets, is_leaf, + ULINT_UNDEFINED, &heap); + rec= rec_get_next_ptr(rec, TRUE); #ifdef UNIV_ZIP_DEBUG - ut_a(page_zip_validate(page_zip, block->frame, index)); + ut_a(page_zip_validate(block->page.zip.data, block->frame, index)); #endif /* UNIV_ZIP_DEBUG */ - page_cur_delete_rec(&cur, index, offsets, mtr); - } while (page_offset(rec) != PAGE_NEW_SUPREMUM); - - if (UNIV_LIKELY_NULL(heap)) { - mem_heap_free(heap); - } - - return; - } - - prev_rec = page_rec_get_prev(rec); - - last_rec = page_rec_get_prev(page_get_supremum_rec(block->frame)); - - const bool scrub = srv_immediate_scrub_data_uncompressed; - if (scrub || size == ULINT_UNDEFINED || n_recs == ULINT_UNDEFINED) { - rec_t* rec2 = rec; - /* Calculate the sum of sizes and the number of records */ - size = 0; - n_recs = 0; - - do { - ulint s; - offsets = rec_get_offsets(rec2, index, offsets, - is_leaf, - ULINT_UNDEFINED, &heap); - s = rec_offs_size(offsets); - ut_ad(ulint(rec2 - block->frame) + s - - rec_offs_extra_size(offsets) - < srv_page_size); - ut_ad(size + s < srv_page_size); - size += s; - n_recs++; - - if (scrub) { - /* scrub record */ - mtr->memset(block, page_offset(rec2), - rec_offs_data_size(offsets), 0); - } - - rec2 = page_rec_get_next(rec2); - } while (!page_rec_is_supremum(rec2)); - - if (UNIV_LIKELY_NULL(heap)) { - mem_heap_free(heap); - } - } - - ut_ad(size < srv_page_size); - - { - rec_t* rec2 = rec; - ulint count = 0; - - if (page_is_comp(block->frame)) { - while (!rec_get_n_owned_new(rec2)) { - count++; - rec2 = rec_get_next_ptr(rec2, TRUE); - } - - ut_ad(rec_get_n_owned_new(rec2) > count); - - n_owned = rec_get_n_owned_new(rec2) - count; - } else { - while (!rec_get_n_owned_old(rec2)) { - count++; - rec2 = rec_get_next_ptr(rec2, FALSE); - } - - ut_ad(rec_get_n_owned_old(rec2) > count); - - n_owned = rec_get_n_owned_old(rec2) - count; - } - - slot_index = page_dir_find_owner_slot(rec2); - ut_ad(slot_index > 0); - } - - mtr->write<2,mtr_t::OPT>(*block, PAGE_N_DIR_SLOTS + PAGE_HEADER - + block->frame, slot_index + 1); - - /* Catenate the deleted chain segment to the page free list */ - alignas(4) byte page_header[4]; - byte* page_free = my_assume_aligned<4>(PAGE_HEADER + PAGE_FREE - + block->frame); - const uint16_t free = page_header_get_field(block->frame, PAGE_FREE); - static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility"); - - mach_write_to_2(page_header, page_offset(rec)); - mach_write_to_2(my_assume_aligned<2>(page_header + 2), - mach_read_from_2(my_assume_aligned<2>(page_free + 2)) - + size); - mtr->memcpy(*block, page_free, page_header, 4); - - byte* page_n_recs = my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER - + block->frame); - mtr->write<2>(*block, page_n_recs, - ulint{mach_read_from_2(page_n_recs)} - n_recs); - - /* Update the page directory; there is no need to balance the number - of the records owned by the supremum record, as it is allowed to be - less than PAGE_DIR_SLOT_MIN_N_OWNED */ - page_dir_slot_t*slot = page_dir_get_nth_slot(block->frame, slot_index); - - if (page_is_comp(block->frame)) { - mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_NEW_SUPREMUM); - byte* owned = PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED - + block->frame; - byte new_owned = (*owned & ~REC_N_OWNED_MASK) - | static_cast<byte>(n_owned << REC_N_OWNED_SHIFT); - mtr->write<1,mtr_t::OPT>(*block, owned, new_owned); - mtr->write<2>(*block, prev_rec - REC_NEXT, - static_cast<uint16_t> - (PAGE_NEW_SUPREMUM - page_offset(prev_rec))); - mtr->write<2>(*block, last_rec - REC_NEXT, free - ? static_cast<uint16_t> - (free - page_offset(last_rec)) - : 0U); - } else { - mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_OLD_SUPREMUM); - byte* owned = PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED - + block->frame; - byte new_owned = (*owned & ~REC_N_OWNED_MASK) - | static_cast<byte>(n_owned << REC_N_OWNED_SHIFT); - mtr->write<1,mtr_t::OPT>(*block, owned, new_owned); - mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM); - mtr->write<2>(*block, last_rec - REC_NEXT, free); - } + page_cur_delete_rec(&cur, index, offsets, mtr); + } + while (page_offset(rec) != PAGE_NEW_SUPREMUM); + + if (UNIV_LIKELY_NULL(heap)) + mem_heap_free(heap); + return; + } +#endif + + byte *prev_rec= page_rec_get_prev(rec); + byte *last_rec= page_rec_get_prev(page_get_supremum_rec(block->frame)); + + // FIXME: consider a special case of shrinking PAGE_HEAP_TOP + + const bool scrub= srv_immediate_scrub_data_uncompressed; + if (scrub || size == ULINT_UNDEFINED || n_recs == ULINT_UNDEFINED) + { + rec_t *rec2= rec; + /* Calculate the sum of sizes and the number of records */ + size= 0; + n_recs= 0; + + do + { + offsets = rec_get_offsets(rec2, index, offsets, is_leaf, + ULINT_UNDEFINED, &heap); + ulint s= rec_offs_size(offsets); + ut_ad(ulint(rec2 - block->frame) + s - rec_offs_extra_size(offsets) < + srv_page_size); + ut_ad(size + s < srv_page_size); + size+= s; + n_recs++; + + if (scrub) + mtr->memset(block, page_offset(rec2), rec_offs_data_size(offsets), 0); + + rec2 = page_rec_get_next(rec2); + } + while (!page_rec_is_supremum(rec2)); + + if (UNIV_LIKELY_NULL(heap)) + mem_heap_free(heap); + } + + ut_ad(size < srv_page_size); + + ulint slot_index, n_owned; + { + const rec_t *owner_rec= rec; + ulint count= 0; + + if (page_is_comp(block->frame)) + while (!(n_owned= rec_get_n_owned_new(owner_rec))) + { + count++; + owner_rec= rec_get_next_ptr_const(owner_rec, TRUE); + } + else + while (!(n_owned= rec_get_n_owned_old(owner_rec))) + { + count++; + owner_rec= rec_get_next_ptr_const(owner_rec, FALSE); + } + + ut_ad(n_owned > count); + n_owned-= count; + slot_index= page_dir_find_owner_slot(owner_rec); + ut_ad(slot_index > 0); + } + + mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2> + (PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame), + slot_index + 1); + mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2> + (PAGE_LAST_INSERT + PAGE_HEADER + block->frame), + 0U); + /* Catenate the deleted chain segment to the page free list */ + alignas(4) byte page_header[4]; + byte *page_free= my_assume_aligned<4>(PAGE_HEADER + PAGE_FREE + + block->frame); + const uint16_t free= page_header_get_field(block->frame, PAGE_FREE); + static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility"); + + mach_write_to_2(page_header, page_offset(rec)); + mach_write_to_2(my_assume_aligned<2>(page_header + 2), + mach_read_from_2(my_assume_aligned<2>(page_free + 2)) + + size); + mtr->memcpy(*block, page_free, page_header, 4); + + byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER + + block->frame); + mtr->write<2>(*block, page_n_recs, + ulint{mach_read_from_2(page_n_recs)} - n_recs); + + /* Update the page directory; there is no need to balance the number + of the records owned by the supremum record, as it is allowed to be + less than PAGE_DIR_SLOT_MIN_N_OWNED */ + page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, slot_index); + + if (page_is_comp(block->frame)) + { + mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_NEW_SUPREMUM); + byte *owned= PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED + block->frame; + byte new_owned= (*owned & ~REC_N_OWNED_MASK) | + static_cast<byte>(n_owned << REC_N_OWNED_SHIFT); +#if 0 // FIXME: implement minimal logging for ROW_FORMAT=COMPRESSED + if (UNIV_LIKELY_NULL(block->page.zip.data)) + { + *owned= new_owned; + memcpy_aligned<2>(PAGE_N_DIR_SLOTS + PAGE_HEADER + block->page.zip.data, + PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame, + PAGE_N_RECS + 2 - PAGE_N_DIR_SLOTS); + // TODO: the equivalent of page_zip_dir_delete() for all records + mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t> + (PAGE_NEW_SUPREMUM - page_offset(prev_rec))); + mach_write_to_2(last_rec - REC_NEXT, free + ? static_cast<uint16_t>(free - page_offset(last_rec)) + : 0U); + return; + } +#endif + mtr->write<1,mtr_t::OPT>(*block, owned, new_owned); + mtr->write<2>(*block, prev_rec - REC_NEXT, static_cast<uint16_t> + (PAGE_NEW_SUPREMUM - page_offset(prev_rec))); + mtr->write<2>(*block, last_rec - REC_NEXT, free + ? static_cast<uint16_t>(free - page_offset(last_rec)) + : 0U); + } + else + { + mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_OLD_SUPREMUM); + byte *owned= PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED + block->frame; + byte new_owned= (*owned & ~REC_N_OWNED_MASK) | + static_cast<byte>(n_owned << REC_N_OWNED_SHIFT); + mtr->write<1,mtr_t::OPT>(*block, owned, new_owned); + mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM); + mtr->write<2>(*block, last_rec - REC_NEXT, free); + } } /*************************************************************//** |