summaryrefslogtreecommitdiff
path: root/innobase/page/page0cur.c
diff options
context:
space:
mode:
Diffstat (limited to 'innobase/page/page0cur.c')
-rw-r--r--innobase/page/page0cur.c1110
1 files changed, 1110 insertions, 0 deletions
diff --git a/innobase/page/page0cur.c b/innobase/page/page0cur.c
new file mode 100644
index 00000000000..e329b916b1b
--- /dev/null
+++ b/innobase/page/page0cur.c
@@ -0,0 +1,1110 @@
+/************************************************************************
+The page cursor
+
+(c) 1994-1996 Innobase Oy
+
+Created 10/4/1994 Heikki Tuuri
+*************************************************************************/
+
+#include "page0cur.h"
+#ifdef UNIV_NONINL
+#include "page0cur.ic"
+#endif
+
+#include "rem0cmp.h"
+#include "mtr0log.h"
+
+ulint page_cur_short_succ = 0;
+
+ulint page_rnd = 976722341;
+
+#ifdef PAGE_CUR_ADAPT
+
+/********************************************************************
+Tries a search shortcut based on the last insert. */
+UNIV_INLINE
+ibool
+page_cur_try_search_shortcut(
+/*=========================*/
+ page_t* page, /* in: index page */
+ dtuple_t* tuple, /* in: data tuple */
+ ulint* iup_matched_fields,
+ /* in/out: already matched fields in upper
+ limit record */
+ ulint* iup_matched_bytes,
+ /* in/out: already matched bytes in a field
+ not yet completely matched */
+ ulint* ilow_matched_fields,
+ /* in/out: already matched fields in lower
+ limit record */
+ ulint* ilow_matched_bytes,
+ /* in/out: already matched bytes in a field
+ not yet completely matched */
+ page_cur_t* cursor) /* out: page cursor */
+{
+ int cmp;
+ rec_t* rec;
+ rec_t* next_rec;
+ ulint low_match;
+ ulint low_bytes;
+ ulint up_match;
+ ulint up_bytes;
+#ifdef UNIV_SEARCH_DEBUG
+ page_cur_t cursor2;
+#endif
+ ut_ad(dtuple_check_typed(tuple));
+
+ rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
+
+ ut_ad(rec);
+ ut_ad(page_rec_is_user_rec(rec));
+
+ ut_pair_min(&low_match, &low_bytes,
+ *ilow_matched_fields, *ilow_matched_bytes,
+ *iup_matched_fields, *iup_matched_bytes);
+
+ up_match = low_match;
+ up_bytes = low_bytes;
+
+ cmp = page_cmp_dtuple_rec_with_match(tuple, rec, &low_match,
+ &low_bytes);
+ if (cmp == -1) {
+
+ return(FALSE);
+ }
+
+ next_rec = page_rec_get_next(rec);
+
+ cmp = page_cmp_dtuple_rec_with_match(tuple, next_rec, &up_match,
+ &up_bytes);
+ if (cmp != -1) {
+
+ return(FALSE);
+ }
+
+ cursor->rec = rec;
+
+#ifdef UNIV_SEARCH_DEBUG
+ page_cur_search_with_match(page, tuple, PAGE_CUR_DBG,
+ iup_matched_fields,
+ iup_matched_bytes,
+ ilow_matched_fields,
+ ilow_matched_bytes,
+ &cursor2);
+ ut_a(cursor2.rec == cursor->rec);
+
+ if (next_rec != page_get_supremum_rec(page)) {
+
+ ut_a(*iup_matched_fields == up_match);
+ ut_a(*iup_matched_bytes == up_bytes);
+ }
+
+ ut_a(*ilow_matched_fields == low_match);
+ ut_a(*ilow_matched_bytes == low_bytes);
+#endif
+ if (next_rec != page_get_supremum_rec(page)) {
+
+ *iup_matched_fields = up_match;
+ *iup_matched_bytes = up_bytes;
+ }
+
+ *ilow_matched_fields = low_match;
+ *ilow_matched_bytes = low_bytes;
+
+#ifdef UNIV_SEARCH_PERF_STAT
+ page_cur_short_succ++;
+#endif
+ return(TRUE);
+}
+
+#endif
+
+/********************************************************************
+Searches the right position for a page cursor. */
+
+void
+page_cur_search_with_match(
+/*=======================*/
+ page_t* page, /* in: index page */
+ dtuple_t* tuple, /* in: data tuple */
+ ulint mode, /* in: PAGE_CUR_L, PAGE_CUR_LE, PAGE_CUR_G,
+ or PAGE_CUR_GE */
+ ulint* iup_matched_fields,
+ /* in/out: already matched fields in upper
+ limit record */
+ ulint* iup_matched_bytes,
+ /* in/out: already matched bytes in a field
+ not yet completely matched */
+ ulint* ilow_matched_fields,
+ /* in/out: already matched fields in lower
+ limit record */
+ ulint* ilow_matched_bytes,
+ /* in/out: already matched bytes in a field
+ not yet completely matched */
+ page_cur_t* cursor) /* out: page cursor */
+{
+ ulint up;
+ ulint low;
+ ulint mid;
+ page_dir_slot_t* slot;
+ rec_t* up_rec;
+ rec_t* low_rec;
+ rec_t* mid_rec;
+ ulint up_matched_fields;
+ ulint up_matched_bytes;
+ ulint low_matched_fields;
+ ulint low_matched_bytes;
+ ulint cur_matched_fields;
+ ulint cur_matched_bytes;
+ int cmp;
+#ifdef UNIV_SEARCH_DEBUG
+ int dbg_cmp;
+ ulint dbg_matched_fields;
+ ulint dbg_matched_bytes;
+#endif
+ ut_ad(page && tuple && iup_matched_fields && iup_matched_bytes
+ && ilow_matched_fields && ilow_matched_bytes && cursor);
+ ut_ad(dtuple_validate(tuple));
+ ut_ad(dtuple_check_typed(tuple));
+ ut_ad((mode == PAGE_CUR_L) || (mode == PAGE_CUR_LE)
+ || (mode == PAGE_CUR_G) || (mode == PAGE_CUR_GE)
+ || (mode == PAGE_CUR_DBG));
+
+#ifdef PAGE_CUR_ADAPT
+ if ((page_header_get_field(page, PAGE_LEVEL) == 0)
+ && (mode == PAGE_CUR_LE)
+ && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
+ && (page_header_get_ptr(page, PAGE_LAST_INSERT))
+ && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
+
+ if (page_cur_try_search_shortcut(page, tuple,
+ iup_matched_fields,
+ iup_matched_bytes,
+ ilow_matched_fields,
+ ilow_matched_bytes,
+ cursor)) {
+ return;
+ }
+ }
+/*#ifdef UNIV_SEARCH_DEBUG */
+ if (mode == PAGE_CUR_DBG) {
+ mode = PAGE_CUR_LE;
+ }
+/*#endif */
+#endif
+ /* If mode PAGE_CUR_G is specified, we are trying to position the
+ cursor to answer a query of the form "tuple < X", where tuple is
+ the input parameter, and X denotes an arbitrary physical record on
+ the page. We want to position the cursor on the first X which
+ satisfies the condition. */
+
+ up_matched_fields = *iup_matched_fields;
+ up_matched_bytes = *iup_matched_bytes;
+ low_matched_fields = *ilow_matched_fields;
+ low_matched_bytes = *ilow_matched_bytes;
+
+ /* Perform binary search. First the search is done through the page
+ directory, after that as a linear search in the list of records
+ owned by the upper limit directory slot. */
+
+ low = 0;
+ up = page_dir_get_n_slots(page) - 1;
+
+ /* Perform binary search until the lower and upper limit directory
+ slots come to the distance 1 of each other */
+
+ while (up - low > 1) {
+ mid = (low + up) / 2;
+ slot = page_dir_get_nth_slot(page, mid);
+ mid_rec = page_dir_slot_get_rec(slot);
+
+ ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
+ low_matched_fields, low_matched_bytes,
+ up_matched_fields, up_matched_bytes);
+
+ cmp = cmp_dtuple_rec_with_match(tuple, mid_rec,
+ &cur_matched_fields,
+ &cur_matched_bytes);
+ if (cmp == 1) {
+ low = mid;
+ low_matched_fields = cur_matched_fields;
+ low_matched_bytes = cur_matched_bytes;
+
+ } else if (cmp == -1) {
+ up = mid;
+ up_matched_fields = cur_matched_fields;
+ up_matched_bytes = cur_matched_bytes;
+
+ } else if ((mode == PAGE_CUR_G) || (mode == PAGE_CUR_LE)) {
+ low = mid;
+ low_matched_fields = cur_matched_fields;
+ low_matched_bytes = cur_matched_bytes;
+ } else {
+ up = mid;
+ up_matched_fields = cur_matched_fields;
+ up_matched_bytes = cur_matched_bytes;
+ }
+ }
+
+ slot = page_dir_get_nth_slot(page, low);
+ low_rec = page_dir_slot_get_rec(slot);
+ slot = page_dir_get_nth_slot(page, up);
+ up_rec = page_dir_slot_get_rec(slot);
+
+ /* Perform linear search until the upper and lower records
+ come to distance 1 of each other. */
+
+ while (page_rec_get_next(low_rec) != up_rec) {
+
+ mid_rec = page_rec_get_next(low_rec);
+
+ ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
+ low_matched_fields, low_matched_bytes,
+ up_matched_fields, up_matched_bytes);
+
+ cmp = cmp_dtuple_rec_with_match(tuple, mid_rec,
+ &cur_matched_fields,
+ &cur_matched_bytes);
+ if (cmp == 1) {
+ low_rec = mid_rec;
+ low_matched_fields = cur_matched_fields;
+ low_matched_bytes = cur_matched_bytes;
+
+ } else if (cmp == -1) {
+ up_rec = mid_rec;
+ up_matched_fields = cur_matched_fields;
+ up_matched_bytes = cur_matched_bytes;
+
+ } else if ((mode == PAGE_CUR_G) || (mode == PAGE_CUR_LE)) {
+ low_rec = mid_rec;
+ low_matched_fields = cur_matched_fields;
+ low_matched_bytes = cur_matched_bytes;
+ } else {
+ up_rec = mid_rec;
+ up_matched_fields = cur_matched_fields;
+ up_matched_bytes = cur_matched_bytes;
+ }
+ }
+
+#ifdef UNIV_SEARCH_DEBUG
+
+ /* Check that the lower and upper limit records have the
+ right alphabetical order compared to tuple. */
+ dbg_matched_fields = 0;
+ dbg_matched_bytes = 0;
+
+ dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, low_rec,
+ &dbg_matched_fields,
+ &dbg_matched_bytes);
+ if (mode == PAGE_CUR_G) {
+ ut_a(dbg_cmp >= 0);
+ } else if (mode == PAGE_CUR_GE) {
+ ut_a(dbg_cmp == 1);
+ } else if (mode == PAGE_CUR_L) {
+ ut_a(dbg_cmp == 1);
+ } else if (mode == PAGE_CUR_LE) {
+ ut_a(dbg_cmp >= 0);
+ }
+
+ if (low_rec != page_get_infimum_rec(page)) {
+
+ ut_a(low_matched_fields == dbg_matched_fields);
+ ut_a(low_matched_bytes == dbg_matched_bytes);
+ }
+
+ dbg_matched_fields = 0;
+ dbg_matched_bytes = 0;
+
+ dbg_cmp = page_cmp_dtuple_rec_with_match(tuple, up_rec,
+ &dbg_matched_fields,
+ &dbg_matched_bytes);
+ if (mode == PAGE_CUR_G) {
+ ut_a(dbg_cmp == -1);
+ } else if (mode == PAGE_CUR_GE) {
+ ut_a(dbg_cmp <= 0);
+ } else if (mode == PAGE_CUR_L) {
+ ut_a(dbg_cmp <= 0);
+ } else if (mode == PAGE_CUR_LE) {
+ ut_a(dbg_cmp == -1);
+ }
+
+ if (up_rec != page_get_supremum_rec(page)) {
+
+ ut_a(up_matched_fields == dbg_matched_fields);
+ ut_a(up_matched_bytes == dbg_matched_bytes);
+ }
+#endif
+ if (mode <= PAGE_CUR_GE) {
+ cursor->rec = up_rec;
+ } else {
+ cursor->rec = low_rec;
+ }
+
+ *iup_matched_fields = up_matched_fields;
+ *iup_matched_bytes = up_matched_bytes;
+ *ilow_matched_fields = low_matched_fields;
+ *ilow_matched_bytes = low_matched_bytes;
+}
+
+/***************************************************************
+Positions a page cursor on a randomly chosen user record on a page. If there
+are no user records, sets the cursor on the infimum record. */
+
+void
+page_cur_open_on_rnd_user_rec(
+/*==========================*/
+ page_t* page, /* in: page */
+ page_cur_t* cursor) /* in/out: page cursor */
+{
+ ulint rnd;
+ rec_t* rec;
+
+ if (page_get_n_recs(page) == 0) {
+ page_cur_position(page_get_infimum_rec(page), cursor);
+
+ return;
+ }
+
+ page_rnd += 87584577;
+
+ rnd = page_rnd % page_get_n_recs(page);
+
+ rec = page_get_infimum_rec(page);
+
+ rec = page_rec_get_next(rec);
+
+ while (rnd > 0) {
+ rec = page_rec_get_next(rec);
+
+ rnd--;
+ }
+
+ page_cur_position(rec, cursor);
+}
+
+/***************************************************************
+Writes the log record of a record insert on a page. */
+static
+void
+page_cur_insert_rec_write_log(
+/*==========================*/
+ rec_t* insert_rec, /* in: inserted physical record */
+ ulint rec_size, /* in: insert_rec size */
+ rec_t* cursor_rec, /* in: record the cursor is pointing to */
+ mtr_t* mtr) /* in: mini-transaction handle */
+{
+ ulint cur_rec_size;
+ ulint extra_size;
+ ulint cur_extra_size;
+ ulint min_rec_size;
+ byte* ins_ptr;
+ byte* cur_ptr;
+ ulint extra_info_yes;
+ byte* log_ptr;
+ ulint i;
+
+ log_ptr = mlog_open(mtr, 30 + MLOG_BUF_MARGIN);
+
+ if (log_ptr == NULL) {
+
+ return;
+ }
+
+ extra_size = rec_get_extra_size(insert_rec);
+
+ cur_extra_size = rec_get_extra_size(cursor_rec);
+ cur_rec_size = rec_get_size(cursor_rec);
+
+ ins_ptr = insert_rec - extra_size;
+
+ i = 0;
+
+ if (cur_extra_size == extra_size) {
+ min_rec_size = ut_min(cur_rec_size, rec_size);
+
+ cur_ptr = cursor_rec - cur_extra_size;
+
+ /* Find out the first byte in insert_rec which differs from
+ cursor_rec; skip the bytes in the record info */
+
+ for (;;) {
+ if (i >= min_rec_size) {
+
+ break;
+ } else if (*ins_ptr == *cur_ptr) {
+ i++;
+ ins_ptr++;
+ cur_ptr++;
+ } else if ((i < extra_size)
+ && (i >= extra_size - REC_N_EXTRA_BYTES)) {
+ i = extra_size;
+ ins_ptr = insert_rec;
+ cur_ptr = cursor_rec;
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
+
+ log_ptr = mlog_write_initial_log_record_fast(insert_rec,
+ MLOG_REC_INSERT, log_ptr, mtr);
+ /* Write the cursor rec offset as a 2-byte ulint */
+ mach_write_to_2(log_ptr, cursor_rec
+ - buf_frame_align(cursor_rec));
+ log_ptr += 2;
+ }
+
+ if ((rec_get_info_bits(insert_rec) != rec_get_info_bits(cursor_rec))
+ || (extra_size != cur_extra_size)
+ || (rec_size != cur_rec_size)) {
+
+ extra_info_yes = 1;
+ } else {
+ extra_info_yes = 0;
+ }
+
+ /* Write the record end segment length and the extra info storage
+ flag */
+ log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i)
+ + extra_info_yes);
+ if (extra_info_yes) {
+ /* Write the info bits */
+ mach_write_to_1(log_ptr, rec_get_info_bits(insert_rec));
+ log_ptr++;
+
+ /* Write the record origin offset */
+ log_ptr += mach_write_compressed(log_ptr, extra_size);
+
+ /* Write the mismatch index */
+ log_ptr += mach_write_compressed(log_ptr, i);
+ }
+
+ /* Write to the log the inserted index record end segment which
+ differs from the cursor record */
+
+ if (rec_size - i < MLOG_BUF_MARGIN) {
+ ut_memcpy(log_ptr, ins_ptr, rec_size - i);
+ log_ptr += rec_size - i;
+ }
+
+ mlog_close(mtr, log_ptr);
+
+ if (rec_size - i >= MLOG_BUF_MARGIN) {
+ mlog_catenate_string(mtr, ins_ptr, rec_size - i);
+ }
+}
+
+/***************************************************************
+Parses a log record of a record insert on a page. */
+
+byte*
+page_cur_parse_insert_rec(
+/*======================*/
+ /* out: end of log record or NULL */
+ ibool is_short,/* in: TRUE if short inserts */
+ byte* ptr, /* in: buffer */
+ byte* end_ptr,/* in: buffer end */
+ page_t* page, /* in: page or NULL */
+ mtr_t* mtr) /* in: mtr or NULL */
+{
+ ulint extra_info_yes;
+ ulint offset;
+ ulint origin_offset;
+ ulint end_seg_len;
+ ulint mismatch_index;
+ rec_t* cursor_rec;
+ byte buf1[1024];
+ byte* buf;
+ ulint info_bits;
+ page_cur_t cursor;
+
+ if (!is_short) {
+ /* Read the cursor rec offset as a 2-byte ulint */
+
+ if (end_ptr < ptr + 2) {
+
+ return(NULL);
+ }
+
+ offset = mach_read_from_2(ptr);
+
+ ptr += 2;
+ }
+
+ ptr = mach_parse_compressed(ptr, end_ptr, &end_seg_len);
+
+ if (ptr == NULL) {
+
+ return(NULL);
+ }
+
+ extra_info_yes = end_seg_len & 0x1;
+ end_seg_len = end_seg_len / 2;
+
+ if (extra_info_yes) {
+ /* Read the info bits */
+
+ if (end_ptr < ptr + 1) {
+
+ return(NULL);
+ }
+
+ info_bits = mach_read_from_1(ptr);
+ ptr++;
+
+ ptr = mach_parse_compressed(ptr, end_ptr, &origin_offset);
+
+ if (ptr == NULL) {
+
+ return(NULL);
+ }
+
+ ptr = mach_parse_compressed(ptr, end_ptr, &mismatch_index);
+
+ if (ptr == NULL) {
+
+ return(NULL);
+ }
+ }
+
+ if (end_ptr < ptr + end_seg_len) {
+
+ return(NULL);
+ }
+
+ if (page == NULL) {
+
+ return(ptr + end_seg_len);
+ }
+
+ /* Read from the log the inserted index record end segment which
+ differs from the cursor record */
+
+ if (is_short) {
+ cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
+ } else {
+ cursor_rec = page + offset;
+ }
+
+ if (extra_info_yes == 0) {
+ info_bits = rec_get_info_bits(cursor_rec);
+ origin_offset = rec_get_extra_size(cursor_rec);
+ mismatch_index = rec_get_size(cursor_rec) - end_seg_len;
+ }
+
+ if (mismatch_index + end_seg_len < 1024) {
+ buf = buf1;
+ } else {
+ buf = mem_alloc(mismatch_index + end_seg_len);
+ }
+
+ /* Build the inserted record to buf */
+
+ ut_memcpy(buf, rec_get_start(cursor_rec), mismatch_index);
+ ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
+
+ rec_set_info_bits(buf + origin_offset, info_bits);
+
+ page_cur_position(cursor_rec, &cursor);
+
+ page_cur_rec_insert(&cursor, buf + origin_offset, mtr);
+
+ if (mismatch_index + end_seg_len >= 1024) {
+
+ mem_free(buf);
+ }
+
+ return(ptr + end_seg_len);
+}
+
+/***************************************************************
+Inserts a record next to page cursor. Returns pointer to inserted record if
+succeed, i.e., enough space available, NULL otherwise. The record to be
+inserted can be in a data tuple or as a physical record. The other parameter
+must then be NULL. The cursor stays at the same position. */
+
+rec_t*
+page_cur_insert_rec_low(
+/*====================*/
+ /* out: pointer to record if succeed, NULL
+ otherwise */
+ page_cur_t* cursor, /* in: a page cursor */
+ dtuple_t* tuple, /* in: pointer to a data tuple or NULL */
+ ulint data_size,/* in: data size of tuple */
+ rec_t* rec, /* in: pointer to a physical record or NULL */
+ mtr_t* mtr) /* in: mini-transaction handle */
+{
+ byte* insert_buf = NULL;
+ ulint rec_size;
+ byte* page; /* the relevant page */
+ rec_t* last_insert; /* cursor position at previous insert */
+ rec_t* insert_rec; /* inserted record */
+ ulint heap_no; /* heap number of the inserted record */
+ rec_t* current_rec; /* current record after which the
+ new record is inserted */
+ rec_t* next_rec; /* next record after current before
+ the insertion */
+ ulint owner_slot; /* the slot which owns the inserted record */
+ rec_t* owner_rec;
+ ulint n_owned;
+
+ ut_ad(cursor && mtr);
+ ut_ad(tuple || rec);
+ ut_ad(!(tuple && rec));
+ ut_ad(rec || dtuple_check_typed(tuple));
+ ut_ad(rec || (dtuple_get_data_size(tuple) == data_size));
+
+ page = page_cur_get_page(cursor);
+
+ ut_ad(cursor->rec != page_get_supremum_rec(page));
+
+ /* 1. Get the size of the physical record in the page */
+ if (tuple != NULL) {
+ rec_size = data_size + rec_get_converted_extra_size(
+ data_size,
+ dtuple_get_n_fields(tuple));
+ } else {
+ rec_size = rec_get_size(rec);
+ }
+
+ /* 2. Try to find suitable space from page memory management */
+ insert_buf = page_mem_alloc(page, rec_size, &heap_no);
+
+ if (insert_buf == NULL) {
+
+ return(NULL);
+ }
+
+ /* 3. Create the record */
+ if (tuple != NULL) {
+ insert_rec = rec_convert_dtuple_to_rec_low(insert_buf, tuple,
+ data_size);
+ } else {
+ insert_rec = rec_copy(insert_buf, rec);
+ }
+
+ ut_ad(insert_rec);
+ ut_ad(rec_size == rec_get_size(insert_rec));
+
+ /* 4. Insert the record in the linked list of records */
+
+ current_rec = cursor->rec;
+
+ next_rec = page_rec_get_next(current_rec);
+ page_rec_set_next(insert_rec, next_rec);
+ page_rec_set_next(current_rec, insert_rec);
+
+ page_header_set_field(page, PAGE_N_RECS, 1 + page_get_n_recs(page));
+
+ /* 5. Set the n_owned field in the inserted record to zero,
+ and set the heap_no field */
+
+ rec_set_n_owned(insert_rec, 0);
+ rec_set_heap_no(insert_rec, heap_no);
+
+ /* 6. Update the last insertion info in page header */
+
+ last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
+
+ if (last_insert == NULL) {
+ page_header_set_field(page, PAGE_DIRECTION, PAGE_NO_DIRECTION);
+ page_header_set_field(page, PAGE_N_DIRECTION, 0);
+
+ } else if ((last_insert == current_rec)
+ && (page_header_get_field(page, PAGE_DIRECTION) != PAGE_LEFT)) {
+
+ page_header_set_field(page, PAGE_DIRECTION, PAGE_RIGHT);
+ page_header_set_field(page, PAGE_N_DIRECTION,
+ page_header_get_field(page, PAGE_N_DIRECTION) + 1);
+
+ } else if ((page_rec_get_next(insert_rec) == last_insert)
+ && (page_header_get_field(page, PAGE_DIRECTION) != PAGE_RIGHT)) {
+
+ page_header_set_field(page, PAGE_DIRECTION, PAGE_LEFT);
+ page_header_set_field(page, PAGE_N_DIRECTION,
+ page_header_get_field(page, PAGE_N_DIRECTION) + 1);
+ } else {
+ page_header_set_field(page, PAGE_DIRECTION, PAGE_NO_DIRECTION);
+ page_header_set_field(page, PAGE_N_DIRECTION, 0);
+ }
+
+ page_header_set_ptr(page, PAGE_LAST_INSERT, insert_rec);
+
+ /* 7. It remains to update the owner record. */
+
+ owner_rec = page_rec_find_owner_rec(insert_rec);
+ n_owned = rec_get_n_owned(owner_rec);
+ rec_set_n_owned(owner_rec, n_owned + 1);
+
+ /* 8. Now we have incremented the n_owned field of the owner
+ record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
+ we have to split the corresponding directory slot in two. */
+
+ if (n_owned == PAGE_DIR_SLOT_MAX_N_OWNED) {
+ owner_slot = page_dir_find_owner_slot(owner_rec);
+ page_dir_split_slot(page, owner_slot);
+ }
+
+ /* 9. Write log record of the insert */
+ page_cur_insert_rec_write_log(insert_rec, rec_size, current_rec, mtr);
+
+ return(insert_rec);
+}
+
+/**************************************************************
+Writes a log record of copying a record list end to a new created page. */
+UNIV_INLINE
+byte*
+page_copy_rec_list_to_created_page_write_log(
+/*=========================================*/
+ /* out: 4-byte field where to write the log data
+ length */
+ page_t* page, /* in: index page */
+ mtr_t* mtr) /* in: mtr */
+{
+ byte* log_ptr;
+
+ mlog_write_initial_log_record(page, MLOG_LIST_END_COPY_CREATED, mtr);
+
+ log_ptr = mlog_open(mtr, 4);
+
+ mlog_close(mtr, log_ptr + 4);
+
+ return(log_ptr);
+}
+
+/**************************************************************
+Parses a log record of copying a record list end to a new created page. */
+
+byte*
+page_parse_copy_rec_list_to_created_page(
+/*=====================================*/
+ /* out: end of log record or NULL */
+ byte* ptr, /* in: buffer */
+ byte* end_ptr,/* in: buffer end */
+ page_t* page, /* in: page or NULL */
+ mtr_t* mtr) /* in: mtr or NULL */
+{
+ byte* rec_end;
+ ulint log_data_len;
+
+ if (ptr + 4 > end_ptr) {
+
+ return(NULL);
+ }
+
+ log_data_len = mach_read_from_4(ptr);
+ ptr += 4;
+
+ rec_end = ptr + log_data_len;
+
+ if (rec_end > end_ptr) {
+
+ return(NULL);
+ }
+
+ if (!page) {
+
+ return(rec_end);
+ }
+
+ while (ptr < rec_end) {
+ ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr, page, mtr);
+ }
+
+ ut_a(ptr == rec_end);
+
+ page_header_set_ptr(page, PAGE_LAST_INSERT, NULL);
+ page_header_set_field(page, PAGE_DIRECTION, PAGE_NO_DIRECTION);
+ page_header_set_field(page, PAGE_N_DIRECTION, 0);
+
+ return(rec_end);
+}
+
+/*****************************************************************
+Copies records from page to a newly created page, from a given record onward,
+including that record. Infimum and supremum records are not copied. */
+
+void
+page_copy_rec_list_end_to_created_page(
+/*===================================*/
+ page_t* new_page, /* in: index page to copy to */
+ page_t* page, /* in: index page */
+ rec_t* rec, /* in: first record to copy */
+ mtr_t* mtr) /* in: mtr */
+{
+ page_dir_slot_t* slot;
+ byte* heap_top;
+ rec_t* insert_rec;
+ rec_t* prev_rec;
+ ulint count;
+ ulint n_recs;
+ ulint slot_index;
+ ulint rec_size;
+ ulint log_mode;
+ byte* log_ptr;
+ ulint log_data_len;
+
+ ut_ad(page_header_get_field(new_page, PAGE_N_HEAP) == 2);
+ ut_ad(page != new_page);
+
+ if (rec == page_get_infimum_rec(page)) {
+
+ rec = page_rec_get_next(rec);
+ }
+
+ if (rec == page_get_supremum_rec(page)) {
+
+ return;
+ }
+
+#ifdef UNIV_DEBUG
+ /* To pass the debug tests we have to set these dummy values
+ in the debug version */
+ page_header_set_field(new_page, PAGE_N_DIR_SLOTS, UNIV_PAGE_SIZE / 2);
+ page_header_set_ptr(new_page, PAGE_HEAP_TOP,
+ new_page + UNIV_PAGE_SIZE - 1);
+#endif
+
+ log_ptr = page_copy_rec_list_to_created_page_write_log(new_page, mtr);
+
+ log_data_len = dyn_array_get_data_size(&(mtr->log));
+
+ /* Individual inserts are logged in a shorter form */
+
+ log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
+
+ prev_rec = page_get_infimum_rec(new_page);
+ heap_top = new_page + PAGE_SUPREMUM_END;
+ count = 0;
+ slot_index = 0;
+ n_recs = 0;
+
+ while (rec != page_get_supremum_rec(page)) {
+
+ insert_rec = rec_copy(heap_top, rec);
+
+ rec_set_next_offs(prev_rec, insert_rec - new_page);
+
+ rec_set_n_owned(insert_rec, 0);
+ rec_set_heap_no(insert_rec, 2 + n_recs);
+
+ rec_size = rec_get_size(insert_rec);
+
+ heap_top = heap_top + rec_size;
+
+ ut_ad(heap_top < new_page + UNIV_PAGE_SIZE);
+
+ count++;
+ n_recs++;
+
+ if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) {
+
+ slot_index++;
+
+ slot = page_dir_get_nth_slot(new_page, slot_index);
+
+ page_dir_slot_set_rec(slot, insert_rec);
+ page_dir_slot_set_n_owned(slot, count);
+
+ count = 0;
+ }
+
+ page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
+ mtr);
+ prev_rec = insert_rec;
+ rec = page_rec_get_next(rec);
+ }
+
+ if ((slot_index > 0) && (count + 1
+ + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
+ <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
+ /* We can merge the two last dir slots. This operation is
+ here to make this function imitate exactly the equivalent
+ task made using page_cur_insert_rec, which we use in database
+ recovery to reproduce the task performed by this function.
+ To be able to check the correctness of recovery, it is good
+ that it imitates exactly. */
+
+ count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
+
+ page_dir_slot_set_n_owned(slot, 0);
+
+ slot_index--;
+ }
+
+ log_data_len = dyn_array_get_data_size(&(mtr->log)) - log_data_len;
+
+ mach_write_to_4(log_ptr, log_data_len);
+
+ rec_set_next_offs(insert_rec, PAGE_SUPREMUM);
+
+ slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
+
+ page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
+ page_dir_slot_set_n_owned(slot, count + 1);
+
+ page_header_set_field(new_page, PAGE_N_DIR_SLOTS, 2 + slot_index);
+ page_header_set_ptr(new_page, PAGE_HEAP_TOP, heap_top);
+ page_header_set_field(new_page, PAGE_N_HEAP, 2 + n_recs);
+ page_header_set_field(new_page, PAGE_N_RECS, n_recs);
+
+ page_header_set_ptr(new_page, PAGE_LAST_INSERT, NULL);
+ page_header_set_field(new_page, PAGE_DIRECTION, PAGE_NO_DIRECTION);
+ page_header_set_field(new_page, PAGE_N_DIRECTION, 0);
+
+ /* Restore the log mode */
+
+ mtr_set_log_mode(mtr, log_mode);
+}
+
+/***************************************************************
+Writes log record of a record delete on a page. */
+UNIV_INLINE
+void
+page_cur_delete_rec_write_log(
+/*==========================*/
+ rec_t* cursor_rec, /* in: record to be deleted */
+ mtr_t* mtr) /* in: mini-transaction handle */
+{
+ mlog_write_initial_log_record(cursor_rec, MLOG_REC_DELETE, mtr);
+
+ /* Write the cursor rec offset as a 2-byte ulint */
+ mlog_catenate_ulint(mtr, cursor_rec - buf_frame_align(cursor_rec),
+ MLOG_2BYTES);
+}
+
+/***************************************************************
+Parses log record of a record delete on a page. */
+
+byte*
+page_cur_parse_delete_rec(
+/*======================*/
+ /* out: pointer to record end or NULL */
+ byte* ptr, /* in: buffer */
+ byte* end_ptr,/* in: buffer end */
+ page_t* page, /* in: page or NULL */
+ mtr_t* mtr) /* in: mtr or NULL */
+{
+ ulint offset;
+ page_cur_t cursor;
+
+ if (end_ptr < ptr + 2) {
+
+ return(NULL);
+ }
+
+ /* Read the cursor rec offset as a 2-byte ulint */
+ offset = mach_read_from_2(ptr);
+ ptr += 2;
+
+ if (page) {
+ page_cur_position(page + offset, &cursor);
+
+ page_cur_delete_rec(&cursor, mtr);
+ }
+
+ return(ptr);
+}
+
+/***************************************************************
+Deletes a record at the page cursor. The cursor is moved to the next
+record after the deleted one. */
+
+void
+page_cur_delete_rec(
+/*================*/
+ page_cur_t* cursor, /* in: a page cursor */
+ mtr_t* mtr) /* in: mini-transaction handle */
+{
+ page_t* page;
+ rec_t* current_rec;
+ rec_t* prev_rec = NULL;
+ rec_t* next_rec;
+ ulint cur_slot_no;
+ page_dir_slot_t* cur_dir_slot;
+ page_dir_slot_t* prev_slot;
+ ulint cur_n_owned;
+ rec_t* rec;
+
+ ut_ad(cursor && mtr);
+
+ page = page_cur_get_page(cursor);
+ current_rec = cursor->rec;
+
+ /* The record must not be the supremum or infimum record. */
+ ut_ad(current_rec != page_get_supremum_rec(page));
+ ut_ad(current_rec != page_get_infimum_rec(page));
+
+ /* Save to local variables some data associated with current_rec */
+ cur_slot_no = page_dir_find_owner_slot(current_rec);
+ cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
+ cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
+
+ /* 0. Write the log record */
+ page_cur_delete_rec_write_log(current_rec, mtr);
+
+ /* 1. Reset the last insert info in the page header and increment
+ the modify clock for the frame */
+
+ page_header_set_ptr(page, PAGE_LAST_INSERT, NULL);
+
+ /* The page gets invalid for optimistic searches: increment the
+ frame modify clock */
+
+ buf_frame_modify_clock_inc(page);
+
+ /* 2. Find the next and the previous record. Note that the cursor is
+ left at the next record. */
+
+ ut_ad(cur_slot_no > 0);
+ prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
+
+ rec = page_dir_slot_get_rec(prev_slot);
+
+ /* rec now points to the record of the previous directory slot. Look
+ for the immediate predecessor of current_rec in a loop. */
+
+ while(current_rec != rec) {
+ prev_rec = rec;
+ rec = page_rec_get_next(rec);
+ }
+
+ page_cur_move_to_next(cursor);
+ next_rec = cursor->rec;
+
+ /* 3. Remove the record from the linked list of records */
+
+ page_rec_set_next(prev_rec, next_rec);
+ page_header_set_field(page, PAGE_N_RECS,
+ (ulint)(page_get_n_recs(page) - 1));
+
+ /* 4. If the deleted record is pointed to by a dir slot, update the
+ record pointer in slot. In the following if-clause we assume that
+ prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
+ >= 2. */
+
+ ut_ad(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
+ ut_ad(cur_n_owned > 1);
+
+ if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
+ page_dir_slot_set_rec(cur_dir_slot, prev_rec);
+ }
+
+ /* 5. Update the number of owned records of the slot */
+
+ page_dir_slot_set_n_owned(cur_dir_slot, cur_n_owned - 1);
+
+ /* 6. Free the memory occupied by the record */
+ page_mem_free(page, current_rec);
+
+ /* 7. Now we have decremented the number of owned records of the slot.
+ If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
+ slots. */
+
+ if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
+ page_dir_balance_slot(page, cur_slot_no);
+ }
+}