summaryrefslogtreecommitdiff
path: root/storage/innobase/btr/btr0bulk.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/btr/btr0bulk.cc')
-rw-r--r--storage/innobase/btr/btr0bulk.cc995
1 files changed, 995 insertions, 0 deletions
diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc
new file mode 100644
index 00000000000..4a3de744823
--- /dev/null
+++ b/storage/innobase/btr/btr0bulk.cc
@@ -0,0 +1,995 @@
+/*****************************************************************************
+
+Copyright (c) 2014, 2015, Oracle and/or its affiliates. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file btr/btr0bulk.cc
+The B-tree bulk load
+
+Created 03/11/2014 Shaohua Wang
+*******************************************************/
+
+#include "btr0bulk.h"
+#include "btr0btr.h"
+#include "btr0cur.h"
+#include "btr0pcur.h"
+#include "ibuf0ibuf.h"
+
+/** Innodb B-tree index fill factor for bulk load. */
+long innobase_fill_factor;
+
+/** Initialize members, allocate page if needed and start mtr.
+Note: we commit all mtrs on failure.
+@return error code. */
+dberr_t
+PageBulk::init()
+{
+ mtr_t* mtr;
+ buf_block_t* new_block;
+ page_t* new_page;
+ page_zip_des_t* new_page_zip;
+ ulint new_page_no;
+
+ ut_ad(m_heap == NULL);
+ m_heap = mem_heap_create(1000);
+
+ mtr = static_cast<mtr_t*>(
+ mem_heap_alloc(m_heap, sizeof(mtr_t)));
+ mtr_start(mtr);
+ mtr_x_lock(dict_index_get_lock(m_index), mtr);
+ mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
+ mtr_set_flush_observer(mtr, m_flush_observer);
+
+ if (m_page_no == FIL_NULL) {
+ mtr_t alloc_mtr;
+
+ /* We commit redo log for allocation by a separate mtr,
+ because we don't guarantee pages are committed following
+ the allocation order, and we will always generate redo log
+ for page allocation, even when creating a new tablespace. */
+ mtr_start(&alloc_mtr);
+ alloc_mtr.set_named_space(dict_index_get_space(m_index));
+
+ ulint n_reserved;
+ bool success;
+ success = fsp_reserve_free_extents(&n_reserved, m_index->space,
+ 1, FSP_NORMAL, &alloc_mtr);
+ if (!success) {
+ mtr_commit(&alloc_mtr);
+ mtr_commit(mtr);
+ return(DB_OUT_OF_FILE_SPACE);
+ }
+
+ /* Allocate a new page. */
+ new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level,
+ &alloc_mtr, mtr);
+
+ if (n_reserved > 0) {
+ fil_space_release_free_extents(m_index->space,
+ n_reserved);
+ }
+
+ mtr_commit(&alloc_mtr);
+
+ new_page = buf_block_get_frame(new_block);
+ new_page_zip = buf_block_get_page_zip(new_block);
+ new_page_no = page_get_page_no(new_page);
+
+ if (new_page_zip) {
+ page_create_zip(new_block, m_index, m_level, 0,
+ NULL, mtr);
+ } else {
+ ut_ad(!dict_index_is_spatial(m_index));
+ page_create(new_block, mtr,
+ dict_table_is_comp(m_index->table),
+ false);
+ btr_page_set_level(new_page, NULL, m_level, mtr);
+ }
+
+ btr_page_set_next(new_page, NULL, FIL_NULL, mtr);
+ btr_page_set_prev(new_page, NULL, FIL_NULL, mtr);
+
+ btr_page_set_index_id(new_page, NULL, m_index->id, mtr);
+ } else {
+ page_id_t page_id(dict_index_get_space(m_index), m_page_no);
+ page_size_t page_size(dict_table_page_size(m_index->table));
+
+ new_block = btr_block_get(page_id, page_size,
+ RW_X_LATCH, m_index, mtr);
+
+ new_page = buf_block_get_frame(new_block);
+ new_page_zip = buf_block_get_page_zip(new_block);
+ new_page_no = page_get_page_no(new_page);
+ ut_ad(m_page_no == new_page_no);
+
+ ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
+
+ btr_page_set_level(new_page, NULL, m_level, mtr);
+ }
+
+ if (dict_index_is_sec_or_ibuf(m_index)
+ && !dict_table_is_temporary(m_index->table)
+ && page_is_leaf(new_page)) {
+ page_update_max_trx_id(new_block, NULL, m_trx_id, mtr);
+ }
+
+ m_mtr = mtr;
+ m_block = new_block;
+ m_block->skip_flush_check = true;
+ m_page = new_page;
+ m_page_zip = new_page_zip;
+ m_page_no = new_page_no;
+ m_cur_rec = page_get_infimum_rec(new_page);
+ ut_ad(m_is_comp == !!page_is_comp(new_page));
+ m_free_space = page_get_free_space_of_empty(m_is_comp);
+
+ if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) {
+ /* Keep default behavior compatible with 5.6 */
+ m_reserved_space = dict_index_get_space_reserve();
+ } else {
+ m_reserved_space =
+ UNIV_PAGE_SIZE * (100 - innobase_fill_factor) / 100;
+ }
+
+ m_padding_space =
+ UNIV_PAGE_SIZE - dict_index_zip_pad_optimal_page_size(m_index);
+ m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
+ m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
+
+ ut_d(m_total_data = 0);
+ page_header_set_field(m_page, NULL, PAGE_HEAP_TOP, UNIV_PAGE_SIZE - 1);
+
+ return(DB_SUCCESS);
+}
+
+/** Insert a record in the page.
+@param[in] rec record
+@param[in] offsets record offsets */
+void
+PageBulk::insert(
+ const rec_t* rec,
+ ulint* offsets)
+{
+ ulint rec_size;
+
+ ut_ad(m_heap != NULL);
+
+ rec_size = rec_offs_size(offsets);
+
+#ifdef UNIV_DEBUG
+ /* Check whether records are in order. */
+ if (!page_rec_is_infimum(m_cur_rec)) {
+ rec_t* old_rec = m_cur_rec;
+ ulint* old_offsets = rec_get_offsets(
+ old_rec, m_index, NULL, ULINT_UNDEFINED, &m_heap);
+
+ ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index)
+ > 0);
+ }
+
+ m_total_data += rec_size;
+#endif /* UNIV_DEBUG */
+
+ /* 1. Copy the record to page. */
+ rec_t* insert_rec = rec_copy(m_heap_top, rec, offsets);
+ rec_offs_make_valid(insert_rec, m_index, offsets);
+
+ /* 2. Insert the record in the linked list. */
+ rec_t* next_rec = page_rec_get_next(m_cur_rec);
+
+ page_rec_set_next(insert_rec, next_rec);
+ page_rec_set_next(m_cur_rec, insert_rec);
+
+ /* 3. Set the n_owned field in the inserted record to zero,
+ and set the heap_no field. */
+ if (m_is_comp) {
+ rec_set_n_owned_new(insert_rec, NULL, 0);
+ rec_set_heap_no_new(insert_rec,
+ PAGE_HEAP_NO_USER_LOW + m_rec_no);
+ } else {
+ rec_set_n_owned_old(insert_rec, 0);
+ rec_set_heap_no_old(insert_rec,
+ PAGE_HEAP_NO_USER_LOW + m_rec_no);
+ }
+
+ /* 4. Set member variables. */
+ ulint slot_size;
+ slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
+ - page_dir_calc_reserved_space(m_rec_no);
+
+ ut_ad(m_free_space >= rec_size + slot_size);
+ ut_ad(m_heap_top + rec_size < m_page + UNIV_PAGE_SIZE);
+
+ m_free_space -= rec_size + slot_size;
+ m_heap_top += rec_size;
+ m_rec_no += 1;
+ m_cur_rec = insert_rec;
+}
+
+/** Mark end of insertion to the page. Scan all records to set page dirs,
+and set page header members.
+Note: we refer to page_copy_rec_list_end_to_created_page. */
+void
+PageBulk::finish()
+{
+ ut_ad(m_rec_no > 0);
+
+#ifdef UNIV_DEBUG
+ ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no)
+ <= page_get_free_space_of_empty(m_is_comp));
+
+ /* To pass the debug tests we have to set these dummy values
+ in the debug version */
+ page_dir_set_n_slots(m_page, NULL, UNIV_PAGE_SIZE / 2);
+#endif
+
+ ulint count = 0;
+ ulint n_recs = 0;
+ ulint slot_index = 0;
+ rec_t* insert_rec = page_rec_get_next(page_get_infimum_rec(m_page));
+ page_dir_slot_t* slot = NULL;
+
+ /* Set owner & dir. */
+ do {
+
+ count++;
+ n_recs++;
+
+ if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) {
+
+ slot_index++;
+
+ slot = page_dir_get_nth_slot(m_page, slot_index);
+
+ page_dir_slot_set_rec(slot, insert_rec);
+ page_dir_slot_set_n_owned(slot, NULL, count);
+
+ count = 0;
+ }
+
+ insert_rec = page_rec_get_next(insert_rec);
+ } while (!page_rec_is_supremum(insert_rec));
+
+ if (slot_index > 0
+ && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
+ <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
+ /* We can merge the two last dir slots. This operation is
+ here to make this function imitate exactly the equivalent
+ task made using page_cur_insert_rec, which we use in database
+ recovery to reproduce the task performed by this function.
+ To be able to check the correctness of recovery, it is good
+ that it imitates exactly. */
+
+ count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
+
+ page_dir_slot_set_n_owned(slot, NULL, 0);
+
+ slot_index--;
+ }
+
+ slot = page_dir_get_nth_slot(m_page, 1 + slot_index);
+ page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
+ page_dir_slot_set_n_owned(slot, NULL, count + 1);
+
+ ut_ad(!dict_index_is_spatial(m_index));
+ page_dir_set_n_slots(m_page, NULL, 2 + slot_index);
+ page_header_set_ptr(m_page, NULL, PAGE_HEAP_TOP, m_heap_top);
+ page_dir_set_n_heap(m_page, NULL, PAGE_HEAP_NO_USER_LOW + m_rec_no);
+ page_header_set_field(m_page, NULL, PAGE_N_RECS, m_rec_no);
+
+ page_header_set_ptr(m_page, NULL, PAGE_LAST_INSERT, m_cur_rec);
+ page_header_set_field(m_page, NULL, PAGE_DIRECTION, PAGE_RIGHT);
+ page_header_set_field(m_page, NULL, PAGE_N_DIRECTION, 0);
+
+ m_block->skip_flush_check = false;
+}
+
+/** Commit inserts done to the page
+@param[in] success Flag whether all inserts succeed. */
+void
+PageBulk::commit(
+ bool success)
+{
+ if (success) {
+ ut_ad(page_validate(m_page, m_index));
+
+ /* Set no free space left and no buffered changes in ibuf. */
+ if (!dict_index_is_clust(m_index)
+ && !dict_table_is_temporary(m_index->table)
+ && page_is_leaf(m_page)) {
+ ibuf_set_bitmap_for_bulk_load(
+ m_block, innobase_fill_factor == 100);
+ }
+ }
+
+ mtr_commit(m_mtr);
+}
+
+/** Compress a page of compressed table
+@return true compress successfully or no need to compress
+@return false compress failed. */
+bool
+PageBulk::compress()
+{
+ ut_ad(m_page_zip != NULL);
+
+ return(page_zip_compress(m_page_zip, m_page, m_index,
+ page_zip_level, NULL, m_mtr));
+}
+
+/** Get node pointer
+@return node pointer */
+dtuple_t*
+PageBulk::getNodePtr()
+{
+ rec_t* first_rec;
+ dtuple_t* node_ptr;
+
+ /* Create node pointer */
+ first_rec = page_rec_get_next(page_get_infimum_rec(m_page));
+ ut_a(page_rec_is_user_rec(first_rec));
+ node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no,
+ m_heap, m_level);
+
+ return(node_ptr);
+}
+
+/** Get split rec in left page.We split a page in half when compresssion fails,
+and the split rec will be copied to right page.
+@return split rec */
+rec_t*
+PageBulk::getSplitRec()
+{
+ rec_t* rec;
+ ulint* offsets;
+ ulint total_used_size;
+ ulint total_recs_size;
+ ulint n_recs;
+
+ ut_ad(m_page_zip != NULL);
+ ut_ad(m_rec_no >= 2);
+
+ ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space);
+ total_used_size = page_get_free_space_of_empty(m_is_comp)
+ - m_free_space;
+
+ total_recs_size = 0;
+ n_recs = 0;
+ offsets = NULL;
+ rec = page_get_infimum_rec(m_page);
+
+ do {
+ rec = page_rec_get_next(rec);
+ ut_ad(page_rec_is_user_rec(rec));
+
+ offsets = rec_get_offsets(rec, m_index,
+ offsets, ULINT_UNDEFINED,
+ &(m_heap));
+ total_recs_size += rec_offs_size(offsets);
+ n_recs++;
+ } while (total_recs_size + page_dir_calc_reserved_space(n_recs)
+ < total_used_size / 2);
+
+ /* Keep at least one record on left page */
+ if (page_rec_is_infimum(page_rec_get_prev(rec))) {
+ rec = page_rec_get_next(rec);
+ ut_ad(page_rec_is_user_rec(rec));
+ }
+
+ return(rec);
+}
+
+/** Copy all records after split rec including itself.
+@param[in] rec split rec */
+void
+PageBulk::copyIn(
+ rec_t* split_rec)
+{
+
+ rec_t* rec = split_rec;
+ ulint* offsets = NULL;
+
+ ut_ad(m_rec_no == 0);
+ ut_ad(page_rec_is_user_rec(rec));
+
+ do {
+ offsets = rec_get_offsets(rec, m_index, offsets,
+ ULINT_UNDEFINED, &(m_heap));
+
+ insert(rec, offsets);
+
+ rec = page_rec_get_next(rec);
+ } while (!page_rec_is_supremum(rec));
+
+ ut_ad(m_rec_no > 0);
+}
+
+/** Remove all records after split rec including itself.
+@param[in] rec split rec */
+void
+PageBulk::copyOut(
+ rec_t* split_rec)
+{
+ rec_t* rec;
+ rec_t* last_rec;
+ ulint n;
+
+ /* Suppose before copyOut, we have 5 records on the page:
+ infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
+
+ after copyOut, we have 2 records on the page:
+ infimum->r1->r2->supremum. slot ajustment is not done. */
+
+ rec = page_rec_get_next(page_get_infimum_rec(m_page));
+ last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
+ n = 0;
+
+ while (rec != split_rec) {
+ rec = page_rec_get_next(rec);
+ n++;
+ }
+
+ ut_ad(n > 0);
+
+ /* Set last record's next in page */
+ ulint* offsets = NULL;
+ rec = page_rec_get_prev(split_rec);
+ offsets = rec_get_offsets(rec, m_index,
+ offsets, ULINT_UNDEFINED,
+ &(m_heap));
+ page_rec_set_next(rec, page_get_supremum_rec(m_page));
+
+ /* Set related members */
+ m_cur_rec = rec;
+ m_heap_top = rec_get_end(rec, offsets);
+
+ offsets = rec_get_offsets(last_rec, m_index,
+ offsets, ULINT_UNDEFINED,
+ &(m_heap));
+
+ m_free_space += rec_get_end(last_rec, offsets)
+ - m_heap_top
+ + page_dir_calc_reserved_space(m_rec_no)
+ - page_dir_calc_reserved_space(n);
+ ut_ad(m_free_space > 0);
+ m_rec_no = n;
+
+#ifdef UNIV_DEBUG
+ m_total_data -= rec_get_end(last_rec, offsets) - m_heap_top;
+#endif /* UNIV_DEBUG */
+}
+
+/** Set next page
+@param[in] next_page_no next page no */
+void
+PageBulk::setNext(
+ ulint next_page_no)
+{
+ btr_page_set_next(m_page, NULL, next_page_no, m_mtr);
+}
+
+/** Set previous page
+@param[in] prev_page_no previous page no */
+void
+PageBulk::setPrev(
+ ulint prev_page_no)
+{
+ btr_page_set_prev(m_page, NULL, prev_page_no, m_mtr);
+}
+
+/** Check if required space is available in the page for the rec to be inserted.
+We check fill factor & padding here.
+@param[in] length required length
+@return true if space is available */
+bool
+PageBulk::isSpaceAvailable(
+ ulint rec_size)
+{
+ ulint slot_size;
+ ulint required_space;
+
+ slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
+ - page_dir_calc_reserved_space(m_rec_no);
+
+ required_space = rec_size + slot_size;
+
+ if (required_space > m_free_space) {
+ ut_ad(m_rec_no > 0);
+ return false;
+ }
+
+ /* Fillfactor & Padding apply to both leaf and non-leaf pages.
+ Note: we keep at least 2 records in a page to avoid B-tree level
+ growing too high. */
+ if (m_rec_no >= 2
+ && ((m_page_zip == NULL && m_free_space - required_space
+ < m_reserved_space)
+ || (m_page_zip != NULL && m_free_space - required_space
+ < m_padding_space))) {
+ return(false);
+ }
+
+ return(true);
+}
+
+/** Check whether the record needs to be stored externally.
+@return false if the entire record can be stored locally on the page */
+bool
+PageBulk::needExt(
+ const dtuple_t* tuple,
+ ulint rec_size)
+{
+ return(page_zip_rec_needs_ext(rec_size, m_is_comp,
+ dtuple_get_n_fields(tuple), m_block->page.size));
+}
+
+/** Store external record
+Since the record is not logged yet, so we don't log update to the record.
+the blob data is logged first, then the record is logged in bulk mode.
+@param[in] big_rec external recrod
+@param[in] offsets record offsets
+@return error code */
+dberr_t
+PageBulk::storeExt(
+ const big_rec_t* big_rec,
+ ulint* offsets)
+{
+ /* Note: not all fileds are initialized in btr_pcur. */
+ btr_pcur_t btr_pcur;
+ btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
+ btr_pcur.latch_mode = BTR_MODIFY_LEAF;
+ btr_pcur.btr_cur.index = m_index;
+
+ page_cur_t* page_cur = &btr_pcur.btr_cur.page_cur;
+ page_cur->index = m_index;
+ page_cur->rec = m_cur_rec;
+ page_cur->offsets = offsets;
+ page_cur->block = m_block;
+
+ dberr_t err = btr_store_big_rec_extern_fields(
+ &btr_pcur, NULL, offsets, big_rec, m_mtr,
+ BTR_STORE_INSERT_BULK);
+
+ ut_ad(page_offset(m_cur_rec) == page_offset(page_cur->rec));
+
+ /* Reset m_block and m_cur_rec from page cursor, because
+ block may be changed during blob insert. */
+ m_block = page_cur->block;
+ m_cur_rec = page_cur->rec;
+ m_page = buf_block_get_frame(m_block);
+
+ return(err);
+}
+
+/** Release block by commiting mtr
+Note: log_free_check requires holding no lock/latch in current thread. */
+void
+PageBulk::release()
+{
+ ut_ad(!dict_index_is_spatial(m_index));
+
+ /* We fix the block because we will re-pin it soon. */
+ buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
+
+ /* No other threads can modify this block. */
+ m_modify_clock = buf_block_get_modify_clock(m_block);
+
+ mtr_commit(m_mtr);
+}
+
+/** Start mtr and latch the block */
+dberr_t
+PageBulk::latch()
+{
+ ibool ret;
+
+ mtr_start(m_mtr);
+ mtr_x_lock(dict_index_get_lock(m_index), m_mtr);
+ mtr_set_log_mode(m_mtr, MTR_LOG_NO_REDO);
+ mtr_set_flush_observer(m_mtr, m_flush_observer);
+
+ /* TODO: need a simple and wait version of buf_page_optimistic_get. */
+ ret = buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
+ __FILE__, __LINE__, m_mtr);
+ /* In case the block is S-latched by page_cleaner. */
+ if (!ret) {
+ page_id_t page_id(dict_index_get_space(m_index), m_page_no);
+ page_size_t page_size(dict_table_page_size(m_index->table));
+
+ m_block = buf_page_get_gen(page_id, page_size, RW_X_LATCH,
+ m_block, BUF_GET_IF_IN_POOL,
+ __FILE__, __LINE__, m_mtr, &m_err);
+
+ if (m_err != DB_SUCCESS) {
+ return (m_err);
+ }
+
+ ut_ad(m_block != NULL);
+ }
+
+ buf_block_buf_fix_dec(m_block);
+
+ ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top);
+
+ return (m_err);
+}
+
+/** Split a page
+@param[in] page_bulk page to split
+@param[in] next_page_bulk next page
+@return error code */
+dberr_t
+BtrBulk::pageSplit(
+ PageBulk* page_bulk,
+ PageBulk* next_page_bulk)
+{
+ ut_ad(page_bulk->getPageZip() != NULL);
+
+ /* 1. Check if we have only one user record on the page. */
+ if (page_bulk->getRecNo() <= 1) {
+ return DB_TOO_BIG_RECORD;
+ }
+
+ /* 2. create a new page. */
+ PageBulk new_page_bulk(m_index, m_trx_id, FIL_NULL,
+ page_bulk->getLevel(), m_flush_observer);
+ dberr_t err = new_page_bulk.init();
+ if (err != DB_SUCCESS) {
+ return(err);
+ }
+
+ /* 3. copy the upper half to new page. */
+ rec_t* split_rec = page_bulk->getSplitRec();
+ new_page_bulk.copyIn(split_rec);
+ page_bulk->copyOut(split_rec);
+
+ /* 4. commit the splitted page. */
+ err = pageCommit(page_bulk, &new_page_bulk, true);
+ if (err != DB_SUCCESS) {
+ pageAbort(&new_page_bulk);
+ return(err);
+ }
+
+ /* 5. commit the new page. */
+ err = pageCommit(&new_page_bulk, next_page_bulk, true);
+ if (err != DB_SUCCESS) {
+ pageAbort(&new_page_bulk);
+ return(err);
+ }
+
+ return(err);
+}
+
+/** Commit(finish) a page. We set next/prev page no, compress a page of
+compressed table and split the page if compression fails, insert a node
+pointer to father page if needed, and commit mini-transaction.
+@param[in] page_bulk page to commit
+@param[in] next_page_bulk next page
+@param[in] insert_father false when page_bulk is a root page and
+ true when it's a non-root page
+@return error code */
+dberr_t
+BtrBulk::pageCommit(
+ PageBulk* page_bulk,
+ PageBulk* next_page_bulk,
+ bool insert_father)
+{
+ page_bulk->finish();
+
+ /* Set page links */
+ if (next_page_bulk != NULL) {
+ ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel());
+
+ page_bulk->setNext(next_page_bulk->getPageNo());
+ next_page_bulk->setPrev(page_bulk->getPageNo());
+ } else {
+ /** Suppose a page is released and latched again, we need to
+ mark it modified in mini-transaction. */
+ page_bulk->setNext(FIL_NULL);
+ }
+
+ /* Compress page if it's a compressed table. */
+ if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) {
+ return(pageSplit(page_bulk, next_page_bulk));
+ }
+
+ /* Insert node pointer to father page. */
+ if (insert_father) {
+ dtuple_t* node_ptr = page_bulk->getNodePtr();
+ dberr_t err = insert(node_ptr, page_bulk->getLevel()+1);
+
+ if (err != DB_SUCCESS) {
+ return(err);
+ }
+ }
+
+ /* Commit mtr. */
+ page_bulk->commit(true);
+
+ return(DB_SUCCESS);
+}
+
+/** Log free check */
+void
+BtrBulk::logFreeCheck()
+{
+ if (log_sys->check_flush_or_checkpoint) {
+ release();
+
+ log_free_check();
+
+ latch();
+ }
+}
+
+/** Release all latches */
+void
+BtrBulk::release()
+{
+ ut_ad(m_root_level + 1 == m_page_bulks->size());
+
+ for (ulint level = 0; level <= m_root_level; level++) {
+ PageBulk* page_bulk = m_page_bulks->at(level);
+
+ page_bulk->release();
+ }
+}
+
+/** Re-latch all latches */
+void
+BtrBulk::latch()
+{
+ ut_ad(m_root_level + 1 == m_page_bulks->size());
+
+ for (ulint level = 0; level <= m_root_level; level++) {
+ PageBulk* page_bulk = m_page_bulks->at(level);
+ page_bulk->latch();
+ }
+}
+
+/** Insert a tuple to page in a level
+@param[in] tuple tuple to insert
+@param[in] level B-tree level
+@return error code */
+dberr_t
+BtrBulk::insert(
+ dtuple_t* tuple,
+ ulint level)
+{
+ bool is_left_most = false;
+
+ ut_ad(m_heap != NULL);
+
+ /* Check if we need to create a PageBulk for the level. */
+ if (level + 1 > m_page_bulks->size()) {
+ PageBulk* new_page_bulk
+ = UT_NEW_NOKEY(PageBulk(m_index, m_trx_id, FIL_NULL,
+ level, m_flush_observer));
+ dberr_t err = new_page_bulk->init();
+ if (err != DB_SUCCESS) {
+ return(err);
+ }
+
+ m_page_bulks->push_back(new_page_bulk);
+ ut_ad(level + 1 == m_page_bulks->size());
+ m_root_level = level;
+
+ is_left_most = true;
+ }
+
+ ut_ad(m_page_bulks->size() > level);
+
+ PageBulk* page_bulk = m_page_bulks->at(level);
+
+ if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) {
+ /* The node pointer must be marked as the predefined minimum
+ record, as there is no lower alphabetical limit to records in
+ the leftmost node of a level: */
+ dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple)
+ | REC_INFO_MIN_REC_FLAG);
+ }
+
+ ulint n_ext = 0;
+ ulint rec_size = rec_get_converted_size(m_index, tuple, n_ext);
+ big_rec_t* big_rec = NULL;
+
+ if (page_bulk->needExt(tuple, rec_size)) {
+ /* The record is so big that we have to store some fields
+ externally on separate database pages */
+ big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext);
+
+ if (UNIV_UNLIKELY(big_rec == NULL)) {
+ return(DB_TOO_BIG_RECORD);
+ }
+
+ rec_size = rec_get_converted_size(m_index, tuple, n_ext);
+ }
+
+ if (!page_bulk->isSpaceAvailable(rec_size)) {
+ /* Create a sibling page_bulk. */
+ PageBulk* sibling_page_bulk;
+ sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx_id,
+ FIL_NULL, level,
+ m_flush_observer));
+ dberr_t err = sibling_page_bulk->init();
+ if (err != DB_SUCCESS) {
+ UT_DELETE(sibling_page_bulk);
+ return(err);
+ }
+
+ /* Commit page bulk. */
+ err = pageCommit(page_bulk, sibling_page_bulk, true);
+ if (err != DB_SUCCESS) {
+ pageAbort(sibling_page_bulk);
+ UT_DELETE(sibling_page_bulk);
+ return(err);
+ }
+
+ /* Set new page bulk to page_bulks. */
+ ut_ad(sibling_page_bulk->getLevel() <= m_root_level);
+ m_page_bulks->at(level) = sibling_page_bulk;
+
+ UT_DELETE(page_bulk);
+ page_bulk = sibling_page_bulk;
+
+ /* Important: log_free_check whether we need a checkpoint. */
+ if (page_is_leaf(sibling_page_bulk->getPage())) {
+ /* Check whether trx is interrupted */
+ if (m_flush_observer->check_interrupted()) {
+ return(DB_INTERRUPTED);
+ }
+
+ /* Wake up page cleaner to flush dirty pages. */
+ srv_inc_activity_count();
+ os_event_set(buf_flush_event);
+
+ logFreeCheck();
+ }
+
+ }
+
+ rec_t* rec;
+ ulint* offsets = NULL;
+ /* Convert tuple to rec. */
+ rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc(
+ page_bulk->m_heap, rec_size)), m_index, tuple, n_ext);
+ offsets = rec_get_offsets(rec, m_index, offsets, ULINT_UNDEFINED,
+ &(page_bulk->m_heap));
+
+ page_bulk->insert(rec, offsets);
+
+ if (big_rec != NULL) {
+ dberr_t err;
+
+ ut_ad(dict_index_is_clust(m_index));
+ ut_ad(page_bulk->getLevel() == 0);
+ ut_ad(page_bulk == m_page_bulks->at(0));
+
+ /* Release all latched but leaf node. */
+ for (ulint level = 1; level <= m_root_level; level++) {
+ PageBulk* page_bulk = m_page_bulks->at(level);
+
+ page_bulk->release();
+ }
+
+ err = page_bulk->storeExt(big_rec, offsets);
+
+ /* Latch */
+ for (ulint level = 1; level <= m_root_level; level++) {
+ PageBulk* page_bulk = m_page_bulks->at(level);
+ page_bulk->latch();
+ }
+
+ dtuple_convert_back_big_rec(m_index, tuple, big_rec);
+
+ return(err);
+ }
+
+ return(DB_SUCCESS);
+}
+
+/** Btree bulk load finish. We commit the last page in each level
+and copy the last page in top level to the root page of the index
+if no error occurs.
+@param[in] err whether bulk load was successful until now
+@return error code */
+dberr_t
+BtrBulk::finish(dberr_t err)
+{
+ ulint last_page_no = FIL_NULL;
+
+ ut_ad(!dict_table_is_temporary(m_index->table));
+
+ if (m_page_bulks->size() == 0) {
+ /* The table is empty. The root page of the index tree
+ is already in a consistent state. No need to flush. */
+ return(err);
+ }
+
+ ut_ad(m_root_level + 1 == m_page_bulks->size());
+
+ /* Finish all page bulks */
+ for (ulint level = 0; level <= m_root_level; level++) {
+ PageBulk* page_bulk = m_page_bulks->at(level);
+
+ last_page_no = page_bulk->getPageNo();
+
+ if (err == DB_SUCCESS) {
+ err = pageCommit(page_bulk, NULL,
+ level != m_root_level);
+ }
+
+ if (err != DB_SUCCESS) {
+ pageAbort(page_bulk);
+ }
+
+ UT_DELETE(page_bulk);
+ }
+
+ if (err == DB_SUCCESS) {
+ rec_t* first_rec;
+ mtr_t mtr;
+ buf_block_t* last_block;
+ page_t* last_page;
+ page_id_t page_id(dict_index_get_space(m_index),
+ last_page_no);
+ page_size_t page_size(dict_table_page_size(m_index->table));
+ ulint root_page_no = dict_index_get_page(m_index);
+ PageBulk root_page_bulk(m_index, m_trx_id,
+ root_page_no, m_root_level,
+ m_flush_observer);
+
+ mtr_start(&mtr);
+ mtr.set_named_space(dict_index_get_space(m_index));
+ mtr_x_lock(dict_index_get_lock(m_index), &mtr);
+
+ ut_ad(last_page_no != FIL_NULL);
+ last_block = btr_block_get(page_id, page_size,
+ RW_X_LATCH, m_index, &mtr);
+ last_page = buf_block_get_frame(last_block);
+ first_rec = page_rec_get_next(page_get_infimum_rec(last_page));
+ ut_ad(page_rec_is_user_rec(first_rec));
+
+ /* Copy last page to root page. */
+ err = root_page_bulk.init();
+ if (err != DB_SUCCESS) {
+ mtr_commit(&mtr);
+ return(err);
+ }
+ root_page_bulk.copyIn(first_rec);
+
+ /* Remove last page. */
+ btr_page_free_low(m_index, last_block, m_root_level, false, &mtr);
+
+ /* Do not flush the last page. */
+ last_block->page.flush_observer = NULL;
+
+ mtr_commit(&mtr);
+
+ err = pageCommit(&root_page_bulk, NULL, false);
+ ut_ad(err == DB_SUCCESS);
+ }
+
+#ifdef UNIV_DEBUG
+ dict_sync_check check(true);
+
+ ut_ad(!sync_check_iterate(check));
+#endif /* UNIV_DEBUG */
+
+ ut_ad(err != DB_SUCCESS || btr_validate_index(m_index, NULL, false));
+ return(err);
+}