diff options
Diffstat (limited to 'storage/innobase/btr/btr0bulk.cc')
-rw-r--r-- | storage/innobase/btr/btr0bulk.cc | 142 |
1 files changed, 83 insertions, 59 deletions
diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc index c64a64466d8..8be3d6d4b01 100644 --- a/storage/innobase/btr/btr0bulk.cc +++ b/storage/innobase/btr/btr0bulk.cc @@ -92,6 +92,8 @@ PageBulk::init() new_page_zip = buf_block_get_page_zip(new_block); new_page_no = page_get_page_no(new_page); + byte* index_id = PAGE_HEADER + PAGE_INDEX_ID + new_page; + if (new_page_zip) { page_create_zip(new_block, m_index, m_level, 0, &m_mtr); @@ -100,11 +102,9 @@ PageBulk::init() page_zip_write_header(new_page_zip, FIL_PAGE_PREV + new_page, 8, &m_mtr); - mach_write_to_8(PAGE_HEADER + PAGE_INDEX_ID + new_page, - m_index->id); - page_zip_write_header(new_page_zip, - PAGE_HEADER + PAGE_INDEX_ID - + new_page, 8, &m_mtr); + mach_write_to_8(index_id, m_index->id); + page_zip_write_header(new_page_zip, index_id, + 8, &m_mtr); } else { ut_ad(!dict_index_is_spatial(m_index)); page_create(new_block, &m_mtr, @@ -114,10 +114,10 @@ PageBulk::init() == FIL_PAGE_PREV + 4); compile_time_assert(FIL_NULL == 0xffffffff); mlog_memset(new_block, FIL_PAGE_PREV, 8, 0xff, &m_mtr); - mlog_write_ulint(PAGE_HEADER + PAGE_LEVEL + new_page, - m_level, MLOG_2BYTES, &m_mtr); - mlog_write_ull(PAGE_HEADER + PAGE_INDEX_ID + new_page, - m_index->id, &m_mtr); + m_mtr.write<2,mtr_t::OPT>(*new_block, + PAGE_HEADER + PAGE_LEVEL + + new_page, m_level); + m_mtr.write<8>(*new_block, index_id, m_index->id); } } else { new_block = btr_block_get(*m_index, m_page_no, RW_X_LATCH, @@ -130,7 +130,7 @@ PageBulk::init() ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW); - btr_page_set_level(new_page, new_page_zip, m_level, &m_mtr); + btr_page_set_level(new_block, m_level, &m_mtr); } if (!m_level && dict_index_is_sec_or_ibuf(m_index)) { @@ -169,13 +169,14 @@ PageBulk::init() } /** Insert a record in the page. +@tparam fmt the page format @param[in] rec record @param[in] offsets record offsets */ -void -PageBulk::insert( - const rec_t* rec, - ulint* offsets) +template<PageBulk::format fmt> +inline void PageBulk::insertPage(const rec_t *rec, ulint *offsets) { + ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED)); + ut_ad((fmt != REDUNDANT) == m_is_comp); ulint rec_size; ut_ad(m_heap != NULL); @@ -210,7 +211,7 @@ PageBulk::insert( /* 3. Set the n_owned field in the inserted record to zero, and set the heap_no field. */ - if (m_is_comp) { + if (fmt != REDUNDANT) { rec_set_n_owned_new(insert_rec, NULL, 0); rec_set_heap_no_new(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no); @@ -242,15 +243,30 @@ PageBulk::insert( m_cur_rec = insert_rec; } +/** Insert a record in the page. +@param[in] rec record +@param[in] offsets record offsets */ +inline void PageBulk::insert(const rec_t *rec, ulint *offsets) +{ + if (UNIV_LIKELY_NULL(m_page_zip)) + insertPage<COMPRESSED>(rec, offsets); + else if (m_is_comp) + insertPage<DYNAMIC>(rec, offsets); + else + insertPage<REDUNDANT>(rec, offsets); +} + /** Mark end of insertion to the page. Scan all records to set page dirs, and set page header members. -Note: we refer to page_copy_rec_list_end_to_created_page. */ -void -PageBulk::finish() +@tparam fmt the page format */ +template<PageBulk::format fmt> +inline void PageBulk::finishPage() { ut_ad(m_rec_no > 0); + ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED)); + ut_ad((fmt != REDUNDANT) == m_is_comp); ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no) - <= page_get_free_space_of_empty(m_is_comp)); + <= page_get_free_space_of_empty(fmt != REDUNDANT)); /* See page_copy_rec_list_end_to_created_page() */ ut_d(page_dir_set_n_slots(m_page, NULL, srv_page_size / 2)); @@ -304,26 +320,26 @@ PageBulk::finish() ut_ad(!dict_index_is_spatial(m_index)); ut_ad(!page_get_instant(m_page)); - - if (!m_flush_observer && !m_page_zip) { - mlog_write_ulint(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page, - 2 + slot_index, MLOG_2BYTES, &m_mtr); - mlog_write_ulint(PAGE_HEADER + PAGE_HEAP_TOP + m_page, - ulint(m_heap_top - m_page), - MLOG_2BYTES, &m_mtr); - mlog_write_ulint(PAGE_HEADER + PAGE_N_HEAP + m_page, - (PAGE_HEAP_NO_USER_LOW + m_rec_no) - | ulint(m_is_comp) << 15, - MLOG_2BYTES, &m_mtr); - mlog_write_ulint(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no, - MLOG_2BYTES, &m_mtr); - mlog_write_ulint(PAGE_HEADER + PAGE_LAST_INSERT + m_page, - ulint(m_cur_rec - m_page), - MLOG_2BYTES, &m_mtr); - mlog_write_ulint(PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page, - PAGE_RIGHT, MLOG_2BYTES, &m_mtr); - mlog_write_ulint(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0, - MLOG_2BYTES, &m_mtr); + ut_ad(!mach_read_from_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page)); + + if (fmt != COMPRESSED && !m_flush_observer) { + m_mtr.write<2,mtr_t::OPT>(*m_block, + PAGE_HEADER + PAGE_N_DIR_SLOTS + + m_page, 2 + slot_index); + m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_HEAP_TOP + m_page, + ulint(m_heap_top - m_page)); + m_mtr.write<2>(*m_block, + PAGE_HEADER + PAGE_N_HEAP + m_page, + (PAGE_HEAP_NO_USER_LOW + m_rec_no) + | uint16_t{fmt != REDUNDANT} << 15); + m_mtr.write<2>(*m_block, + PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no); + m_mtr.write<2>(*m_block, + PAGE_HEADER + PAGE_LAST_INSERT + m_page, + ulint(m_cur_rec - m_page)); + m_mtr.write<2>(*m_block, + PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page, + PAGE_RIGHT); } else { /* For ROW_FORMAT=COMPRESSED, redo log may be written in PageBulk::compress(). */ @@ -333,18 +349,29 @@ PageBulk::finish() ulint(m_heap_top - m_page)); mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + m_page, (PAGE_HEAP_NO_USER_LOW + m_rec_no) - | ulint(m_is_comp) << 15); + | uint16_t{fmt != REDUNDANT} << 15); mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no); mach_write_to_2(PAGE_HEADER + PAGE_LAST_INSERT + m_page, ulint(m_cur_rec - m_page)); mach_write_to_2(PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page, PAGE_RIGHT); - mach_write_to_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0); } m_block->skip_flush_check = false; } +/** Mark end of insertion to the page. Scan all records to set page dirs, +and set page header members. */ +inline void PageBulk::finish() +{ + if (UNIV_LIKELY_NULL(m_page_zip)) + finishPage<COMPRESSED>(); + else if (m_is_comp) + finishPage<DYNAMIC>(); + else + finishPage<REDUNDANT>(); +} + /** Commit inserts done to the page @param[in] success Flag whether all inserts succeed. */ void @@ -521,28 +548,24 @@ PageBulk::copyOut( @param[in] next_page_no next page no */ inline void PageBulk::setNext(ulint next_page_no) { - if (UNIV_LIKELY_NULL(m_page_zip)) { - /* For ROW_FORMAT=COMPRESSED, redo log may be written - in PageBulk::compress(). */ - mach_write_to_4(m_page + FIL_PAGE_NEXT, next_page_no); - } else { - mlog_write_ulint(m_page + FIL_PAGE_NEXT, next_page_no, - MLOG_4BYTES, &m_mtr); - } + if (UNIV_LIKELY_NULL(m_page_zip)) + /* For ROW_FORMAT=COMPRESSED, redo log may be written + in PageBulk::compress(). */ + mach_write_to_4(m_page + FIL_PAGE_NEXT, next_page_no); + else + m_mtr.write<4>(*m_block, m_page + FIL_PAGE_NEXT, next_page_no); } /** Set previous page @param[in] prev_page_no previous page no */ inline void PageBulk::setPrev(ulint prev_page_no) { - if (UNIV_LIKELY_NULL(m_page_zip)) { - /* For ROW_FORMAT=COMPRESSED, redo log may be written - in PageBulk::compress(). */ - mach_write_to_4(m_page + FIL_PAGE_PREV, prev_page_no); - } else { - mlog_write_ulint(m_page + FIL_PAGE_PREV, prev_page_no, - MLOG_4BYTES, &m_mtr); - } + if (UNIV_LIKELY_NULL(m_page_zip)) + /* For ROW_FORMAT=COMPRESSED, redo log may be written + in PageBulk::compress(). */ + mach_write_to_4(m_page + FIL_PAGE_PREV, prev_page_no); + else + m_mtr.write<4>(*m_block, m_page + FIL_PAGE_PREV, prev_page_no); } /** Check if required space is available in the page for the rec to be inserted. @@ -748,9 +771,10 @@ BtrBulk::pageCommit( page_bulk->setNext(next_page_bulk->getPageNo()); next_page_bulk->setPrev(page_bulk->getPageNo()); } else { - /** Suppose a page is released and latched again, we need to + ut_ad(!page_has_next(page_bulk->getPage())); + /* If a page is released and latched again, we need to mark it modified in mini-transaction. */ - page_bulk->setNext(FIL_NULL); + page_bulk->set_modified(); } ut_ad(!rw_lock_own_flagged(&m_index->lock, |