summaryrefslogtreecommitdiff
path: root/storage/innobase
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2020-02-13 11:41:54 +0200
committerMarko Mäkelä <marko.makela@mariadb.com>2020-02-13 18:19:14 +0200
commit2a77b2a51062118488f1d2770c7755d9e7acffa2 (patch)
treebb67b3f587f64f15221499f6c2e1e66cc0b64cb1 /storage/innobase
parentd00185c40d59ee41e380dae87bbed912baea1c0c (diff)
downloadmariadb-git-2a77b2a51062118488f1d2770c7755d9e7acffa2.tar.gz
MDEV-12353: Replace MLOG_*LIST_*_DELETE and MLOG_*REC_DELETE
No longer write the following redo log records: MLOG_COMP_LIST_END_DELETE, MLOG_LIST_END_DELETE, MLOG_COMP_LIST_START_DELETE, MLOG_LIST_START_DELETE, MLOG_REC_DELETE,MLOG_COMP_REC_DELETE. Each individual deleted record will be logged separately using physical log records. page_dir_slot_set_n_owned(), page_zip_rec_set_owned(), page_zip_dir_delete(), page_zip_clear_rec(): Add the parameter mtr, and write redo log. page_dir_slot_set_rec(): Remove. Replaced with lower-level operations that write redo log when necessary. page_rec_set_n_owned(): Replaces rec_set_n_owned_old(), rec_set_n_owned_new(). rec_set_heap_no(): Replaces rec_set_heap_no_old(), rec_set_heap_no_new(). page_mem_free(), page_dir_split_slot(), page_dir_balance_slot(): Add the parameter mtr. page_dir_set_n_slots(): Merge with the caller page_dir_split_slot(). page_dir_slot_set_rec(): Merge with the callers page_dir_split_slot() and page_dir_balance_slot(). page_cur_insert_rec_low(), page_cur_insert_rec_zip(): Suppress the logging of lower-level operations. page_cur_delete_rec_write_log(): Remove. page_cur_delete_rec(): Do not tolerate mtr=NULL. rec_convert_dtuple_to_rec_old(), rec_convert_dtuple_to_rec_comp(): Replace rec_set_heap_no_old() and rec_set_heap_no_new() with direct access that does not involve redo logging. mtr_t::memcpy(): Do allow non-redo-logged writes to uncompressed pages of ROW_FORMAT=COMPRESSED pages. buf_page_io_complete(): Evict the uncompressed page of a ROW_FORMAT=COMPRESSED page after recovery. Because we no longer write logical log records for deleting index records, but instead write physical records that may refer directly to the compressed page frame of a ROW_FORMAT=COMPRESSED page, and because on recovery we will only apply the changes to the ROW_FORMAT=COMPRESSED page, the uncompressed page frame can be stale until page_zip_decompress() is executed. recv_parse_or_apply_log_rec_body(): After applying MLOG_ZIP_WRITE_STRING, ensure that the FIL_PAGE_TYPE of the uncompressed page matches the compressed page, because buf_flush_init_for_writing() assumes that field to be valid. mlog_init_t::mark_ibuf_exist(): Invoke page_zip_decompress(), because the uncompressed page after buf_page_create() is not necessarily up to date. buf_LRU_block_remove_hashed(): Bypass a page_zip_validate() check during redo log apply. recv_apply_hashed_log_recs(): Invoke mlog_init.mark_ibuf_exist() also for the last batch, to ensure that page_zip_decompress() will be called for freshly initialized pages.
Diffstat (limited to 'storage/innobase')
-rw-r--r--storage/innobase/btr/btr0btr.cc5
-rw-r--r--storage/innobase/btr/btr0bulk.cc46
-rw-r--r--storage/innobase/buf/buf0buf.cc21
-rw-r--r--storage/innobase/buf/buf0lru.cc5
-rw-r--r--storage/innobase/include/page0cur.h17
-rw-r--r--storage/innobase/include/page0page.h61
-rw-r--r--storage/innobase/include/page0page.ic46
-rw-r--r--storage/innobase/include/page0types.h7
-rw-r--r--storage/innobase/include/page0zip.h10
-rw-r--r--storage/innobase/include/rem0rec.h39
-rw-r--r--storage/innobase/include/rem0rec.ic58
-rw-r--r--storage/innobase/log/log0recv.cc44
-rw-r--r--storage/innobase/mtr/mtr0log.cc5
-rw-r--r--storage/innobase/page/page0cur.cc556
-rw-r--r--storage/innobase/page/page0page.cc119
-rw-r--r--storage/innobase/page/page0zip.cc120
-rw-r--r--storage/innobase/rem/rem0rec.cc16
17 files changed, 614 insertions, 561 deletions
diff --git a/storage/innobase/btr/btr0btr.cc b/storage/innobase/btr/btr0btr.cc
index 42b62151639..48aeebb37b4 100644
--- a/storage/innobase/btr/btr0btr.cc
+++ b/storage/innobase/btr/btr0btr.cc
@@ -4013,9 +4013,8 @@ btr_discard_only_page_on_level(
DBUG_ASSERT(index->table->instant);
DBUG_ASSERT(rec_is_alter_metadata(rec, *index));
btr_set_instant(block, *index, mtr);
- rec = page_cur_insert_rec_low(
- &cur,
- index, rec, offsets, mtr);
+ rec = page_cur_insert_rec_low(&cur, index, rec,
+ offsets, mtr);
ut_ad(rec);
mem_heap_free(heap);
} else if (index->is_instant()) {
diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc
index 5dd01eabd1e..03d90ade4f4 100644
--- a/storage/innobase/btr/btr0bulk.cc
+++ b/storage/innobase/btr/btr0bulk.cc
@@ -202,16 +202,22 @@ inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets)
static_cast<uint16_t>(next_rec - insert_rec));
mach_write_to_2(m_cur_rec - REC_NEXT,
static_cast<uint16_t>(insert_rec - m_cur_rec));
- rec_set_n_owned_new(insert_rec, NULL, 0);
- rec_set_heap_no_new(insert_rec,
- PAGE_HEAP_NO_USER_LOW + m_rec_no);
+ rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ rec_set_bit_field_2(insert_rec,
+ PAGE_HEAP_NO_USER_LOW + m_rec_no,
+ REC_NEW_HEAP_NO,
+ REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
} else {
mach_write_to_2(insert_rec - REC_NEXT,
mach_read_from_2(m_cur_rec - REC_NEXT));
mach_write_to_2(m_cur_rec - REC_NEXT, page_offset(insert_rec));
- rec_set_n_owned_old(insert_rec, 0);
- rec_set_heap_no_old(insert_rec,
- PAGE_HEAP_NO_USER_LOW + m_rec_no);
+ rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ rec_set_bit_field_2(insert_rec,
+ PAGE_HEAP_NO_USER_LOW + m_rec_no,
+ REC_OLD_HEAP_NO,
+ REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
}
/* 4. Set member variables. */
@@ -282,8 +288,9 @@ inline void PageBulk::finishPage()
{
slot-= PAGE_DIR_SLOT_SIZE;
mach_write_to_2(slot, offset);
- rec_set_n_owned_new(m_page + offset, nullptr, count);
- count = 0;
+ rec_set_bit_field_1(m_page + offset, count, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ count= 0;
}
uint16_t next= (mach_read_from_2(m_page + offset - REC_NEXT) + offset) &
@@ -308,8 +315,8 @@ inline void PageBulk::finishPage()
{
slot-= PAGE_DIR_SLOT_SIZE;
mach_write_to_2(slot, page_offset(insert_rec));
- rec_set_n_owned_old(insert_rec, count);
-
+ rec_set_bit_field_1(insert_rec, count, REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
count= 0;
}
@@ -328,13 +335,26 @@ inline void PageBulk::finishPage()
count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
- page_dir_slot_set_n_owned(slot, nullptr, 0);
+ rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
+ rec_set_bit_field_1(rec, 0, m_is_comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
slot+= PAGE_DIR_SLOT_SIZE;
}
slot-= PAGE_DIR_SLOT_SIZE;
- page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
- page_dir_slot_set_n_owned(slot, nullptr, count + 1);
+
+ if (m_is_comp)
+ {
+ mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
+ rec_set_bit_field_1(m_page + PAGE_NEW_SUPREMUM, count + 1, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ }
+ else
+ {
+ mach_write_to_2(slot, PAGE_OLD_SUPREMUM);
+ rec_set_bit_field_1(m_page + PAGE_OLD_SUPREMUM, count + 1, REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ }
ut_ad(!dict_index_is_spatial(m_index));
ut_ad(!page_get_instant(m_page));
diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc
index 71078622486..e8797684326 100644
--- a/storage/innobase/buf/buf0buf.cc
+++ b/storage/innobase/buf/buf0buf.cc
@@ -5535,13 +5535,27 @@ release_page:
ut_ad(buf_pool->n_pend_reads > 0);
buf_pool->n_pend_reads--;
buf_pool->stat.n_pages_read++;
+ ut_ad(!uncompressed || !bpage->zip.data
+ || !recv_recovery_is_on()
+ || buf_page_can_relocate(bpage));
+ mutex_exit(block_mutex);
if (uncompressed) {
+#if 1 /* MDEV-12353 FIXME: Remove this! */
+ if (UNIV_LIKELY_NULL(bpage->zip.data)
+ && recv_recovery_is_on()) {
+ rw_lock_x_unlock_gen(
+ &reinterpret_cast<buf_block_t*>(bpage)
+ ->lock, BUF_IO_READ);
+ if (!buf_LRU_free_page(bpage, false)) {
+ ut_ad(!"could not remove");
+ }
+ goto func_exit;
+ }
+#endif
rw_lock_x_unlock_gen(&((buf_block_t*) bpage)->lock,
BUF_IO_READ);
}
-
- mutex_exit(block_mutex);
} else {
/* Write means a flush operation: call the completion
routine in the flush system */
@@ -5575,9 +5589,8 @@ release_page:
DBUG_PRINT("ib_buf", ("%s page %u:%u",
io_type == BUF_IO_READ ? "read" : "wrote",
bpage->id.space(), bpage->id.page_no()));
-
+func_exit:
mutex_exit(&buf_pool->mutex);
-
return DB_SUCCESS;
}
diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc
index e28eb66d13c..8f59d0c5668 100644
--- a/storage/innobase/buf/buf0lru.cc
+++ b/storage/innobase/buf/buf0lru.cc
@@ -1765,7 +1765,10 @@ buf_LRU_block_remove_hashed(
case FIL_PAGE_INDEX:
case FIL_PAGE_RTREE:
#if defined UNIV_ZIP_DEBUG && defined BTR_CUR_HASH_ADAPT
- ut_a(page_zip_validate(
+ /* During recovery, we only update the
+ compressed page, not the uncompressed one. */
+ ut_a(recv_recovery_is_on()
+ || page_zip_validate(
&bpage->zip, page,
((buf_block_t*) bpage)->index));
#endif /* UNIV_ZIP_DEBUG && BTR_CUR_HASH_ADAPT */
diff --git a/storage/innobase/include/page0cur.h b/storage/innobase/include/page0cur.h
index fd1f38538bb..3ed924847cf 100644
--- a/storage/innobase/include/page0cur.h
+++ b/storage/innobase/include/page0cur.h
@@ -209,22 +209,6 @@ page_cur_insert_rec_zip(
offset_t* offsets,/*!< in/out: rec_get_offsets(rec, index) */
mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull, warn_unused_result));
-/*************************************************************//**
-Copies records from page to a newly created page, from a given record onward,
-including that record. Infimum and supremum records are not copied.
-
-IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
-if this is a compressed leaf page in a secondary index.
-This has to be done either within the same mini-transaction,
-or by invoking ibuf_reset_free_bits() before mtr_commit(). */
-ATTRIBUTE_COLD /* only used when crash-upgrading */
-void
-page_copy_rec_list_end_to_created_page(
-/*===================================*/
- page_t* new_page, /*!< in/out: index page to copy to */
- rec_t* rec, /*!< in: first record to copy */
- dict_index_t* index, /*!< in: record descriptor */
- mtr_t* mtr); /*!< in: mtr */
/***********************************************************//**
Deletes a record at the page cursor. The cursor is moved to the
next record after the deleted one. */
@@ -363,6 +347,7 @@ page_parse_copy_rec_list_to_created_page(
/***********************************************************//**
Parses log record of a record delete on a page.
@return pointer to record end or NULL */
+ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_cur_parse_delete_rec(
/*======================*/
diff --git a/storage/innobase/include/page0page.h b/storage/innobase/include/page0page.h
index 7831195f213..0ed91cb8b1b 100644
--- a/storage/innobase/include/page0page.h
+++ b/storage/innobase/include/page0page.h
@@ -406,6 +406,38 @@ inline trx_id_t page_get_max_trx_id(const page_t *page)
return mach_read_from_8(p);
}
+/**
+Set the number of owned records.
+@tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well
+@param[in,out] block index page
+@param[in,out] rec ROW_FORMAT=REDUNDANT record
+@param[in] n_owned number of records skipped in the sparse page directory
+@param[in] comp whether ROW_FORMAT is one of COMPACT,DYNAMIC,COMPRESSED
+@param[in,out] mtr mini-transaction */
+template<bool compressed>
+inline void page_rec_set_n_owned(buf_block_t *block, rec_t *rec, ulint n_owned,
+ bool comp, mtr_t *mtr)
+{
+ ut_ad(block->frame == page_align(rec));
+ ut_ad(comp == (page_is_comp(block->frame) != 0));
+
+ if (page_zip_des_t *page_zip= compressed
+ ? buf_block_get_page_zip(block) : nullptr)
+ {
+ ut_ad(comp);
+ rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ if (rec_get_status(rec) != REC_STATUS_SUPREMUM)
+ page_zip_rec_set_owned(block, rec, n_owned, mtr);
+ }
+ else
+ {
+ rec-= comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED;
+ mtr->write<1,mtr_t::OPT>(*block, rec, (*rec & ~REC_N_OWNED_MASK) |
+ (n_owned << REC_N_OWNED_SHIFT));
+ }
+}
+
/*************************************************************//**
Sets the max trx id field value. */
void
@@ -620,17 +652,6 @@ uint16_t
page_dir_get_n_slots(
/*=================*/
const page_t* page); /*!< in: index page */
-/*************************************************************//**
-Sets the number of dir slots in directory. */
-UNIV_INLINE
-void
-page_dir_set_n_slots(
-/*=================*/
- page_t* page, /*!< in/out: page */
- page_zip_des_t* page_zip,/*!< in/out: compressed page whose
- uncompressed part will be updated, or NULL */
- ulint n_slots);/*!< in: number of slots */
-
/** Gets the pointer to a directory slot.
@param n sparse directory slot number
@return pointer to the sparse directory slot */
@@ -664,14 +685,6 @@ inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot)
return page_dir_slot_get_rec(const_cast<rec_t*>(slot));
}
/***************************************************************//**
-This is used to set the record offset in a directory slot. */
-UNIV_INLINE
-void
-page_dir_slot_set_rec(
-/*==================*/
- page_dir_slot_t*slot, /*!< in: directory slot */
- const rec_t* rec); /*!< in: record on the page */
-/***************************************************************//**
Gets the number of records owned by a directory slot.
@return number of records */
UNIV_INLINE
@@ -679,15 +692,6 @@ ulint
page_dir_slot_get_n_owned(
/*======================*/
const page_dir_slot_t* slot); /*!< in: page directory slot */
-/***************************************************************//**
-This is used to set the owned records field of a directory slot. */
-UNIV_INLINE
-void
-page_dir_slot_set_n_owned(
-/*======================*/
- page_dir_slot_t*slot, /*!< in/out: directory slot */
- page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
- ulint n); /*!< in: number of records owned by the slot */
/************************************************************//**
Calculates the space reserved for directory slots of a given
number of records. The exact value is a fraction number
@@ -1138,6 +1142,7 @@ page_move_rec_list_start(
/**********************************************************//**
Parses a log record of a record list end or start deletion.
@return end of log record or NULL */
+ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_parse_delete_rec_list(
/*=======================*/
diff --git a/storage/innobase/include/page0page.ic b/storage/innobase/include/page0page.ic
index eaf44600b60..563de443f76 100644
--- a/storage/innobase/include/page0page.ic
+++ b/storage/innobase/include/page0page.ic
@@ -419,19 +419,6 @@ page_dir_get_n_slots(
{
return(page_header_get_field(page, PAGE_N_DIR_SLOTS));
}
-/*************************************************************//**
-Sets the number of dir slots in directory. */
-UNIV_INLINE
-void
-page_dir_set_n_slots(
-/*=================*/
- page_t* page, /*!< in/out: page */
- page_zip_des_t* page_zip,/*!< in/out: compressed page whose
- uncompressed part will be updated, or NULL */
- ulint n_slots)/*!< in: number of slots */
-{
- page_header_set_field(page, page_zip, PAGE_N_DIR_SLOTS, n_slots);
-}
/*************************************************************//**
Gets the number of records in the heap.
@@ -488,20 +475,6 @@ page_rec_check(
}
/***************************************************************//**
-This is used to set the record offset in a directory slot. */
-UNIV_INLINE
-void
-page_dir_slot_set_rec(
-/*==================*/
- page_dir_slot_t*slot, /*!< in: directory slot */
- const rec_t* rec) /*!< in: record on the page */
-{
- ut_ad(page_rec_check(rec));
-
- mach_write_to_2(slot, page_offset(rec));
-}
-
-/***************************************************************//**
Gets the number of records owned by a directory slot.
@return number of records */
UNIV_INLINE
@@ -518,25 +491,6 @@ page_dir_slot_get_n_owned(
}
}
-/***************************************************************//**
-This is used to set the owned records field of a directory slot. */
-UNIV_INLINE
-void
-page_dir_slot_set_n_owned(
-/*======================*/
- page_dir_slot_t*slot, /*!< in/out: directory slot */
- page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
- ulint n) /*!< in: number of records owned by the slot */
-{
- rec_t* rec = (rec_t*) page_dir_slot_get_rec(slot);
- if (page_rec_is_comp(slot)) {
- rec_set_n_owned_new(rec, page_zip, n);
- } else {
- ut_ad(!page_zip);
- rec_set_n_owned_old(rec, n);
- }
-}
-
/************************************************************//**
Calculates the space reserved for directory slots of a given number of
records. The exact value is a fraction number n * PAGE_DIR_SLOT_SIZE /
diff --git a/storage/innobase/include/page0types.h b/storage/innobase/include/page0types.h
index 37e2eb2d36a..6c5a681f3b5 100644
--- a/storage/innobase/include/page0types.h
+++ b/storage/innobase/include/page0types.h
@@ -40,6 +40,8 @@ typedef byte page_t;
#ifndef UNIV_INNOCHECKSUM
/** Index page cursor */
struct page_cur_t;
+/** Buffer pool block */
+struct buf_block_t;
/** Compressed index page */
typedef byte page_zip_t;
@@ -150,9 +152,10 @@ must already have been written on the uncompressed page. */
void
page_zip_rec_set_owned(
/*===================*/
- page_zip_des_t* page_zip,/*!< in/out: compressed page */
+ buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */
const byte* rec, /*!< in: record on the uncompressed page */
- ulint flag) /*!< in: the owned flag (nonzero=TRUE) */
+ ulint flag, /*!< in: the owned flag (nonzero=TRUE) */
+ mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull));
#endif /* !UNIV_INNOCHECKSUM */
#endif
diff --git a/storage/innobase/include/page0zip.h b/storage/innobase/include/page0zip.h
index 16dac663527..4da4e740994 100644
--- a/storage/innobase/include/page0zip.h
+++ b/storage/innobase/include/page0zip.h
@@ -360,9 +360,10 @@ must already have been written on the uncompressed page. */
void
page_zip_rec_set_owned(
/*===================*/
- page_zip_des_t* page_zip,/*!< in/out: compressed page */
+ buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */
const byte* rec, /*!< in: record on the uncompressed page */
- ulint flag) /*!< in: the owned flag (nonzero=TRUE) */
+ ulint flag, /*!< in: the owned flag (nonzero=TRUE) */
+ mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull));
/**********************************************************************//**
@@ -385,9 +386,10 @@ page_zip_dir_delete(
byte* rec, /*!< in: deleted record */
const dict_index_t* index, /*!< in: index of rec */
const offset_t* offsets, /*!< in: rec_get_offsets(rec) */
- const byte* free) /*!< in: previous start of
+ const byte* free, /*!< in: previous start of
the free list */
- MY_ATTRIBUTE((nonnull(1,2,3,4)));
+ mtr_t* mtr) /*!< in/out: mini-transaction */
+ MY_ATTRIBUTE((nonnull(1,2,3,4,6)));
/**********************************************************************//**
Add a slot to the dense page directory. */
diff --git a/storage/innobase/include/rem0rec.h b/storage/innobase/include/rem0rec.h
index 75a830bef24..d8483fe2803 100644
--- a/storage/innobase/include/rem0rec.h
+++ b/storage/innobase/include/rem0rec.h
@@ -241,15 +241,6 @@ rec_get_n_owned_old(
const rec_t* rec) /*!< in: old-style physical record */
MY_ATTRIBUTE((warn_unused_result));
/******************************************************//**
-The following function is used to set the number of owned records. */
-UNIV_INLINE
-void
-rec_set_n_owned_old(
-/*================*/
- rec_t* rec, /*!< in: old-style physical record */
- ulint n_owned) /*!< in: the number of owned */
- MY_ATTRIBUTE((nonnull));
-/******************************************************//**
The following function is used to get the number of records owned by the
previous directory record.
@return number of owned records */
@@ -260,16 +251,6 @@ rec_get_n_owned_new(
const rec_t* rec) /*!< in: new-style physical record */
MY_ATTRIBUTE((warn_unused_result));
/******************************************************//**
-The following function is used to set the number of owned records. */
-UNIV_INLINE
-void
-rec_set_n_owned_new(
-/*================*/
- rec_t* rec, /*!< in/out: new-style physical record */
- page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
- ulint n_owned)/*!< in: the number of owned */
- MY_ATTRIBUTE((nonnull(1)));
-/******************************************************//**
The following function is used to retrieve the info bits of
a record.
@return info bits */
@@ -418,16 +399,6 @@ rec_get_heap_no_old(
const rec_t* rec) /*!< in: physical record */
MY_ATTRIBUTE((warn_unused_result));
/******************************************************//**
-The following function is used to set the heap number
-field in an old-style record. */
-UNIV_INLINE
-void
-rec_set_heap_no_old(
-/*================*/
- rec_t* rec, /*!< in: physical record */
- ulint heap_no)/*!< in: the heap number */
- MY_ATTRIBUTE((nonnull));
-/******************************************************//**
The following function is used to get the order number
of a new-style record in the heap of the index page.
@return heap order number */
@@ -438,16 +409,6 @@ rec_get_heap_no_new(
const rec_t* rec) /*!< in: physical record */
MY_ATTRIBUTE((warn_unused_result));
/******************************************************//**
-The following function is used to set the heap number
-field in a new-style record. */
-UNIV_INLINE
-void
-rec_set_heap_no_new(
-/*================*/
- rec_t* rec, /*!< in/out: physical record */
- ulint heap_no)/*!< in: the heap number */
- MY_ATTRIBUTE((nonnull));
-/******************************************************//**
The following function is used to test whether the data offsets
in the record are stored in one-byte or two-byte format.
@return TRUE if 1-byte form */
diff --git a/storage/innobase/include/rem0rec.ic b/storage/innobase/include/rem0rec.ic
index a7d75236c32..4405ad0abb7 100644
--- a/storage/innobase/include/rem0rec.ic
+++ b/storage/innobase/include/rem0rec.ic
@@ -504,19 +504,6 @@ rec_get_n_owned_old(
}
/******************************************************//**
-The following function is used to set the number of owned records. */
-UNIV_INLINE
-void
-rec_set_n_owned_old(
-/*================*/
- rec_t* rec, /*!< in: old-style physical record */
- ulint n_owned) /*!< in: the number of owned */
-{
- rec_set_bit_field_1(rec, n_owned, REC_OLD_N_OWNED,
- REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
-}
-
-/******************************************************//**
The following function is used to get the number of records owned by the
previous directory record.
@return number of owned records */
@@ -531,23 +518,6 @@ rec_get_n_owned_new(
}
/******************************************************//**
-The following function is used to set the number of owned records. */
-UNIV_INLINE
-void
-rec_set_n_owned_new(
-/*================*/
- rec_t* rec, /*!< in/out: new-style physical record */
- page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
- ulint n_owned)/*!< in: the number of owned */
-{
- rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED,
- REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
- if (page_zip && rec_get_status(rec) != REC_STATUS_SUPREMUM) {
- page_zip_rec_set_owned(page_zip, rec, n_owned);
- }
-}
-
-/******************************************************//**
The following function is used to retrieve the info bits of a record.
@return info bits */
UNIV_INLINE
@@ -675,20 +645,6 @@ rec_get_heap_no_old(
}
/******************************************************//**
-The following function is used to set the heap number
-field in an old-style record. */
-UNIV_INLINE
-void
-rec_set_heap_no_old(
-/*================*/
- rec_t* rec, /*!< in: physical record */
- ulint heap_no)/*!< in: the heap number */
-{
- rec_set_bit_field_2(rec, heap_no, REC_OLD_HEAP_NO,
- REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
-}
-
-/******************************************************//**
The following function is used to get the order number
of a new-style record in the heap of the index page.
@return heap order number */
@@ -703,20 +659,6 @@ rec_get_heap_no_new(
}
/******************************************************//**
-The following function is used to set the heap number
-field in a new-style record. */
-UNIV_INLINE
-void
-rec_set_heap_no_new(
-/*================*/
- rec_t* rec, /*!< in/out: physical record */
- ulint heap_no)/*!< in: the heap number */
-{
- rec_set_bit_field_2(rec, heap_no, REC_NEW_HEAP_NO,
- REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
-}
-
-/******************************************************//**
The following function is used to test whether the data offsets in the record
are stored in one-byte or two-byte format.
@return TRUE if 1-byte form */
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index 9fa1b669463..d671948fdd1 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -313,7 +313,6 @@ public:
void mark_ibuf_exist(mtr_t& mtr)
{
ut_ad(mutex_own(&recv_sys.mutex));
- ut_ad(!recv_no_ibuf_operations);
mtr.start();
for (const map::value_type& i : inits) {
@@ -324,6 +323,21 @@ public:
i.first, 0, RW_X_LATCH, NULL,
BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
&mtr)) {
+ if (UNIV_LIKELY_NULL(block->page.zip.data)
+ && fil_page_type_is_index(
+ fil_page_get_type(
+ block->page.zip.data))
+ && !page_zip_decompress(&block->page.zip,
+ block->frame,
+ true)) {
+ ib::error() << "corrupted page "
+ << block->page.id;
+ }
+ if (recv_no_ibuf_operations) {
+ mtr.commit();
+ mtr.start();
+ continue;
+ }
mutex_exit(&recv_sys.mutex);
block->page.ibuf_exist = ibuf_page_exists(
block->page);
@@ -1570,6 +1584,13 @@ parse_log:
}
break;
case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
+ if (!page_zip) {
+ } else if (!page_zip_decompress(page_zip, page, true)) {
+ ib::error() << "corrupted page " << block->page.id;
+ } else {
+ ut_d(page_type = fil_page_get_type(page));
+ }
+
ut_ad(!page || fil_page_type_is_index(page_type));
if (NULL != (ptr = mlog_parse_index(
@@ -1603,6 +1624,13 @@ parse_log:
page, page_zip, mtr);
break;
case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
+ if (!page_zip) {
+ } else if (!page_zip_decompress(page_zip, page, true)) {
+ ib::error() << "corrupted page " << block->page.id;
+ } else {
+ ut_d(page_type = fil_page_get_type(page));
+ }
+
ut_ad(!page || fil_page_type_is_index(page_type));
if (NULL != (ptr = mlog_parse_index(
@@ -2113,22 +2141,10 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
memcpy_aligned<8>(FIL_PAGE_LSN
+ page_zip->data,
FIL_PAGE_LSN + page, 8);
- if (fil_page_index_page_check(page)
- && !page_zip_decompress(page_zip, page,
- true)) {
- ib::error() << "corrupted page "
- << block->page.id;
- }
}
}
}
-#ifdef UNIV_ZIP_DEBUG
- ut_ad(!fil_page_index_page_check(page)
- || !page_zip
- || page_zip_validate_low(page_zip, page, NULL, FALSE));
-#endif /* UNIV_ZIP_DEBUG */
-
if (start_lsn) {
buf_block_modify_clock_inc(block);
log_flush_order_mutex_enter();
@@ -2479,7 +2495,7 @@ done:
log_mutex_enter();
mutex_enter(&(recv_sys.mutex));
mlog_init.reset();
- } else if (!recv_no_ibuf_operations) {
+ } else {
/* We skipped this in buf_page_create(). */
mlog_init.mark_ibuf_exist(mtr);
}
diff --git a/storage/innobase/mtr/mtr0log.cc b/storage/innobase/mtr/mtr0log.cc
index 9c8de6dac14..b040442b306 100644
--- a/storage/innobase/mtr/mtr0log.cc
+++ b/storage/innobase/mtr/mtr0log.cc
@@ -276,8 +276,6 @@ void mtr_t::memcpy(const buf_block_t &b, ulint ofs, ulint len)
ut_ad(len);
ut_ad(ofs <= ulint(srv_page_size));
ut_ad(ofs + len <= ulint(srv_page_size));
- ut_ad(ofs + len < PAGE_DATA || !b.page.zip.data ||
- mach_read_from_2(b.frame + FIL_PAGE_TYPE) <= FIL_PAGE_TYPE_ZBLOB2);
set_modified();
if (get_log_mode() != MTR_LOG_ALL)
@@ -287,6 +285,9 @@ void mtr_t::memcpy(const buf_block_t &b, ulint ofs, ulint len)
return;
}
+ ut_ad(ofs + len < PAGE_DATA || !b.page.zip.data ||
+ mach_read_from_2(b.frame + FIL_PAGE_TYPE) <= FIL_PAGE_TYPE_ZBLOB2);
+
byte *l= get_log()->open(11 + 2 + 2);
l= mlog_write_initial_log_record_low(MLOG_WRITE_STRING, b.page.id.space(),
b.page.id.page_no(), l, this);
diff --git a/storage/innobase/page/page0cur.cc b/storage/innobase/page/page0cur.cc
index 727ab63da39..ab3f6e11bb8 100644
--- a/storage/innobase/page/page0cur.cc
+++ b/storage/innobase/page/page0cur.cc
@@ -966,6 +966,24 @@ need_extra_info:
}
}
+static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact)
+{
+ rec_set_bit_field_2(rec, heap_no,
+ compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO,
+ REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
+}
+
+static void rec_set_heap_no(const buf_block_t& block, rec_t *rec,
+ ulint heap_no, bool compact, mtr_t *mtr)
+{
+ rec-= compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO;
+
+ // MDEV-12353 FIXME: try single-byte write if possible
+ mtr->write<2,mtr_t::OPT>(block, rec,
+ (mach_read_from_2(rec) & ~REC_HEAP_NO_MASK) |
+ (heap_no << REC_HEAP_NO_SHIFT));
+}
+
/***********************************************************//**
Parses a log record of a record insert on a page.
@return end of log record or NULL */
@@ -1115,15 +1133,13 @@ page_cur_parse_insert_rec(
memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
memcpy(buf + mismatch_index, ptr, end_seg_len);
+ rec_set_heap_no(buf + origin_offset, PAGE_HEAP_NO_USER_LOW,
+ page_is_comp(page));
if (page_is_comp(page)) {
- rec_set_heap_no_new(buf + origin_offset,
- PAGE_HEAP_NO_USER_LOW);
rec_set_info_and_status_bits(buf + origin_offset,
info_and_status_bits);
} else {
- rec_set_heap_no_old(buf + origin_offset,
- PAGE_HEAP_NO_USER_LOW);
rec_set_info_bits_old(buf + origin_offset,
info_and_status_bits);
}
@@ -1196,64 +1212,105 @@ page_direction_increment(
1U + page_header_get_field(page, PAGE_N_DIRECTION));
}
-/** Split a directory slot which owns too many records.
-@param[in,out] page index page
-@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL
-@param[in] s the slot that needs to be split */
-static void page_dir_split_slot(page_t* page, page_zip_des_t* page_zip,
- ulint s)
+/**
+Set the owned records field of the record pointed to by a directory slot.
+@tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well
+@param[in,out] block file page
+@param[in] slot sparse directory slot
+@param[in,out] n number of records owned by the directory slot
+@param[in,out] mtr mini-transaction */
+template<bool compressed>
+static void page_dir_slot_set_n_owned(buf_block_t *block,
+ const page_dir_slot_t *slot,
+ ulint n, mtr_t *mtr)
{
- ut_ad(!page_zip || page_is_comp(page));
- ut_ad(s);
-
- page_dir_slot_t* slot = page_dir_get_nth_slot(page, s);
- const ulint n_owned = PAGE_DIR_SLOT_MAX_N_OWNED + 1;
-
- ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
- compile_time_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
- >= PAGE_DIR_SLOT_MIN_N_OWNED);
-
- /* 1. We loop to find a record approximately in the middle of the
- records owned by the slot. */
-
- const rec_t* rec = page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
-
- for (ulint i = n_owned / 2; i--; ) {
- rec = page_rec_get_next_const(rec);
- }
-
- /* 2. Add a directory slot immediately below this one. */
- const ulint n_slots = page_dir_get_n_slots(page);
- page_dir_set_n_slots(page, page_zip, n_slots + 1);
- page_dir_slot_t* last_slot = page_dir_get_nth_slot(page, n_slots);
- memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
- slot - last_slot);
-
- /* 3. We store the appropriate values to the new slot. */
-
- page_dir_slot_set_rec(slot, rec);
- page_dir_slot_set_n_owned(slot, page_zip, n_owned / 2);
+ rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
+ page_rec_set_n_owned<compressed>(block, rec, n, page_rec_is_comp(rec), mtr);
+}
- /* 4. Finally, we update the number of records field of the
- original slot */
- page_dir_slot_set_n_owned(slot - PAGE_DIR_SLOT_SIZE,
- page_zip, n_owned - (n_owned / 2));
+/**
+Split a directory slot which owns too many records.
+@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well
+@param[in,out] block index page
+@param[in] s the slot that needs to be split
+@param[in,out] mtr mini-transaction */
+template<bool compressed>
+static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
+{
+ ut_ad(!block->page.zip.data || page_is_comp(block->frame));
+ ut_ad(!compressed || block->page.zip.data);
+ ut_ad(s);
+
+ page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, s);
+ const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
+
+ ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
+ compile_time_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
+ >= PAGE_DIR_SLOT_MIN_N_OWNED);
+
+ /* 1. We loop to find a record approximately in the middle of the
+ records owned by the slot. */
+
+ const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
+
+ for (ulint i= n_owned / 2; i--; )
+ rec= page_rec_get_next_const(rec);
+
+ /* Add a directory slot immediately below this one. */
+ byte *n_slots_p= PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame;
+ const uint16_t n_slots= mach_read_from_2(n_slots_p);
+
+ page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*>
+ (block->frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) -
+ n_slots * PAGE_DIR_SLOT_SIZE);
+ memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
+ slot - last_slot);
+
+ const ulint half_owned= n_owned / 2;
+
+ if (compressed && UNIV_LIKELY_NULL(block->page.zip.data))
+ {
+ /* Log changes to the compressed page header and the dense page
+ directory. */
+ mach_write_to_2(n_slots_p, n_slots + 1);
+ page_zip_write_header(&block->page.zip, n_slots_p, 2, mtr);
+ mach_write_to_2(slot, page_offset(rec));
+ page_rec_set_n_owned<true>(block, page_dir_slot_get_rec(slot), half_owned,
+ true, mtr);
+ page_rec_set_n_owned<true>(block,
+ page_dir_slot_get_rec(slot -
+ PAGE_DIR_SLOT_SIZE),
+ n_owned - half_owned, true, mtr);
+ }
+ else
+ {
+ mtr->write<2>(*block, n_slots_p, 1U + n_slots);
+ mtr->memcpy(*block, page_offset(last_slot), slot - last_slot);
+ mtr->write<2>(*block, slot, page_offset(rec));
+ const bool comp= page_is_comp(block->frame) != 0;
+ page_rec_set_n_owned<false>(block, page_dir_slot_get_rec(slot), half_owned,
+ comp, mtr);
+ page_rec_set_n_owned<false>(block,
+ page_dir_slot_get_rec(slot -
+ PAGE_DIR_SLOT_SIZE),
+ n_owned - half_owned, comp, mtr);
+ }
}
-/** Try to balance an underfilled directory slot with an adjacent one,
+/**
+Try to balance an underfilled directory slot with an adjacent one,
so that there are at least the minimum number of records owned by the slot;
this may result in merging the two slots.
-@param[in,out] page index page
-@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL
-@param[in] s the slot to be balanced */
-static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip,
- ulint s)
+@param[in,out] block index page
+@param[in] s the slot to be balanced
+@param[in,out] mtr mini-transaction */
+static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
{
- ut_ad(!page_zip || page_is_comp(page));
+ ut_ad(!block->page.zip.data || page_is_comp(block->frame));
ut_ad(s > 0);
- const ulint n_slots = page_dir_get_n_slots(page);
+ const ulint n_slots = page_dir_get_n_slots(block->frame);
if (UNIV_UNLIKELY(s + 1 == n_slots)) {
/* The last directory slot cannot be balanced. */
@@ -1262,9 +1319,10 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip,
ut_ad(s < n_slots);
- page_dir_slot_t* slot = page_dir_get_nth_slot(page, s);
+ page_dir_slot_t* slot = page_dir_get_nth_slot(block->frame, s);
page_dir_slot_t* up_slot = slot - PAGE_DIR_SLOT_SIZE;
const ulint up_n_owned = page_dir_slot_get_n_owned(up_slot);
+ page_zip_des_t* page_zip = buf_block_get_page_zip(block);
ut_ad(page_dir_slot_get_n_owned(slot)
== PAGE_DIR_SLOT_MIN_N_OWNED - 1);
@@ -1274,17 +1332,33 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip,
<= PAGE_DIR_SLOT_MAX_N_OWNED);
/* Merge the slots. */
ulint n_owned = page_dir_slot_get_n_owned(slot);
- page_dir_slot_set_n_owned(slot, page_zip, 0);
- page_dir_slot_set_n_owned(up_slot, page_zip,
- n_owned
- + page_dir_slot_get_n_owned(up_slot));
+ page_dir_slot_set_n_owned<true>(block, slot, 0, mtr);
+ page_dir_slot_set_n_owned<true>(block, up_slot, n_owned
+ + page_dir_slot_get_n_owned(
+ up_slot), mtr);
/* Shift the slots */
page_dir_slot_t* last_slot = page_dir_get_nth_slot(
- page, n_slots - 1);
+ block->frame, n_slots - 1);
memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot,
slot - last_slot);
- mach_write_to_2(last_slot, 0);
- page_dir_set_n_slots(page, page_zip, n_slots - 1);
+ if (UNIV_LIKELY_NULL(page_zip)) {
+ memset_aligned<2>(last_slot, 0, 2);
+ mach_write_to_2(PAGE_N_DIR_SLOTS + PAGE_HEADER
+ + block->frame, n_slots - 1);
+ page_zip_write_header(page_zip,
+ PAGE_N_DIR_SLOTS + PAGE_HEADER
+ + block->frame, 2, mtr);
+ } else {
+ mtr->write<2>(*block,
+ PAGE_N_DIR_SLOTS + PAGE_HEADER
+ + block->frame,
+ n_slots - 1);
+ mtr->write<2>(*block, last_slot, 0U);
+ mtr->memcpy(*block, page_offset(last_slot)
+ + PAGE_DIR_SLOT_SIZE,
+ slot - last_slot);
+ }
+
return;
}
@@ -1292,21 +1366,29 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip,
rec_t* old_rec = const_cast<rec_t*>(page_dir_slot_get_rec(slot));
rec_t* new_rec;
- if (page_is_comp(page)) {
+ if (page_is_comp(block->frame)) {
new_rec = rec_get_next_ptr(old_rec, TRUE);
- rec_set_n_owned_new(old_rec, page_zip, 0);
- rec_set_n_owned_new(new_rec, page_zip,
- PAGE_DIR_SLOT_MIN_N_OWNED);
+ page_rec_set_n_owned<true>(block, old_rec, 0, true, mtr);
+ page_rec_set_n_owned<true>(block, new_rec,
+ PAGE_DIR_SLOT_MIN_N_OWNED,
+ true, mtr);
+ if (UNIV_LIKELY_NULL(page_zip)) {
+ mach_write_to_2(slot, page_offset(new_rec));
+ goto func_exit;
+ }
} else {
new_rec = rec_get_next_ptr(old_rec, FALSE);
- rec_set_n_owned_old(old_rec, 0);
- rec_set_n_owned_old(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED);
+ page_rec_set_n_owned<false>(block, old_rec, 0, false, mtr);
+ page_rec_set_n_owned<false>(block, new_rec,
+ PAGE_DIR_SLOT_MIN_N_OWNED,
+ false, mtr);
}
- page_dir_slot_set_rec(slot, new_rec);
- page_dir_slot_set_n_owned(up_slot, page_zip, up_n_owned - 1);
+ mtr->write<2>(*block, slot, page_offset(new_rec));
+func_exit:
+ page_dir_slot_set_n_owned<true>(block, up_slot, up_n_owned - 1, mtr);
}
/** Allocate space for inserting an index record.
@@ -1346,7 +1428,6 @@ page_cur_insert_rec_low(
{
byte* insert_buf;
ulint rec_size;
- page_t* page; /*!< the relevant page */
rec_t* last_insert; /*!< cursor position at previous
insert */
rec_t* free_rec; /*!< a free record that was reused,
@@ -1356,19 +1437,27 @@ page_cur_insert_rec_low(
record */
rec_t* current_rec = cur->rec;
+ buf_block_t* block = cur->block;
ut_ad(rec_offs_validate(rec, index, offsets));
- page = page_align(current_rec);
ut_ad(dict_table_is_comp(index->table)
- == (ibool) !!page_is_comp(page));
- ut_ad(fil_page_index_page_check(page));
- ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
+ == (ibool) !!page_is_comp(block->frame));
+ ut_ad(fil_page_index_page_check(block->frame));
+ ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + block->frame)
+ == index->id
|| index->is_dummy
|| mtr->is_inside_ibuf());
ut_ad(!page_rec_is_supremum(current_rec));
+ const mtr_log_t log_mode = mtr->set_log_mode(MTR_LOG_NONE);
+
+ /* We should not write log for ROW_FORMAT=COMPRESSED pages here. */
+ ut_ad(log_mode == MTR_LOG_NONE
+ || log_mode == MTR_LOG_NO_REDO
+ || !(index->table->flags & DICT_TF_MASK_ZIP_SSIZE));
+
/* 1. Get the size of the physical record in the page */
rec_size = rec_offs_size(offsets);
@@ -1391,7 +1480,7 @@ page_cur_insert_rec_low(
/* 2. Try to find suitable space from page memory management */
- free_rec = page_header_get_ptr(page, PAGE_FREE);
+ free_rec = page_header_get_ptr(block->frame, PAGE_FREE);
if (UNIV_LIKELY_NULL(free_rec)) {
/* Try to allocate from the head of the free list. */
offset_t foffsets_[REC_OFFS_NORMAL_SIZE];
@@ -1401,7 +1490,7 @@ page_cur_insert_rec_low(
rec_offs_init(foffsets_);
foffsets = rec_get_offsets(
- free_rec, index, foffsets, page_is_leaf(page),
+ free_rec, index, foffsets, page_is_leaf(block->frame),
ULINT_UNDEFINED, &heap);
if (rec_offs_size(foffsets) < rec_size) {
if (UNIV_LIKELY_NULL(heap)) {
@@ -1413,16 +1502,16 @@ page_cur_insert_rec_low(
insert_buf = free_rec - rec_offs_extra_size(foffsets);
- if (page_is_comp(page)) {
+ if (page_is_comp(block->frame)) {
heap_no = rec_get_heap_no_new(free_rec);
- page_mem_alloc_free(page, NULL,
- rec_get_next_ptr(free_rec, TRUE),
- rec_size);
+ page_mem_alloc_free(block->frame, NULL,
+ rec_get_next_ptr(free_rec, TRUE),
+ rec_size);
} else {
heap_no = rec_get_heap_no_old(free_rec);
- page_mem_alloc_free(page, NULL,
- rec_get_next_ptr(free_rec, FALSE),
- rec_size);
+ page_mem_alloc_free(block->frame, NULL,
+ rec_get_next_ptr(free_rec, FALSE),
+ rec_size);
}
if (UNIV_LIKELY_NULL(heap)) {
@@ -1431,17 +1520,19 @@ page_cur_insert_rec_low(
} else {
use_heap:
free_rec = NULL;
- insert_buf = page_mem_alloc_heap(page, NULL,
+ insert_buf = page_mem_alloc_heap(block->frame, NULL,
rec_size, &heap_no);
if (UNIV_UNLIKELY(insert_buf == NULL)) {
+ mtr->set_log_mode(log_mode);
return(NULL);
}
}
/* 3. Create the record */
insert_rec = rec_copy(insert_buf, rec, offsets);
- rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
+ rec_offs_make_valid(insert_rec, index, page_is_leaf(block->frame),
+ offsets);
/* 4. Insert the record in the linked list of records */
ut_ad(current_rec != insert_rec);
@@ -1450,7 +1541,7 @@ use_heap:
/* next record after current before the insertion */
rec_t* next_rec = page_rec_get_next(current_rec);
#ifdef UNIV_DEBUG
- if (page_is_comp(page)) {
+ if (page_is_comp(block->frame)) {
switch (rec_get_status(current_rec)) {
case REC_STATUS_ORDINARY:
case REC_STATUS_NODE_PTR:
@@ -1476,56 +1567,59 @@ use_heap:
page_rec_set_next(current_rec, insert_rec);
}
- page_header_set_field(page, NULL, PAGE_N_RECS,
- 1U + page_get_n_recs(page));
+ page_header_set_field(block->frame, NULL, PAGE_N_RECS,
+ 1U + page_get_n_recs(block->frame));
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
- if (page_is_comp(page)) {
- rec_set_n_owned_new(insert_rec, NULL, 0);
- rec_set_heap_no_new(insert_rec, heap_no);
- } else {
- rec_set_n_owned_old(insert_rec, 0);
- rec_set_heap_no_old(insert_rec, heap_no);
- }
+ page_rec_set_n_owned<false>(block, insert_rec, 0,
+ page_is_comp(block->frame), mtr);
+ rec_set_heap_no(*block, insert_rec, heap_no,
+ page_is_comp(block->frame), mtr);
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
/* 6. Update the last insertion info in page header */
- last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
- ut_ad(!last_insert || !page_is_comp(page)
+ last_insert = page_header_get_ptr(block->frame, PAGE_LAST_INSERT);
+ ut_ad(!last_insert || !page_is_comp(block->frame)
|| rec_get_node_ptr_flag(last_insert)
== rec_get_node_ptr_flag(insert_rec));
if (!dict_index_is_spatial(index)) {
- byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
+ byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + block->frame;
if (UNIV_UNLIKELY(last_insert == NULL)) {
no_direction:
- page_direction_reset(ptr, page, NULL);
+ page_direction_reset(ptr, block->frame, NULL);
} else if (last_insert == current_rec
&& page_ptr_get_direction(ptr) != PAGE_LEFT) {
- page_direction_increment(ptr, page, NULL, PAGE_RIGHT);
+ page_direction_increment(ptr, block->frame, NULL,
+ PAGE_RIGHT);
} else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
&& page_rec_get_next(insert_rec) == last_insert) {
- page_direction_increment(ptr, page, NULL, PAGE_LEFT);
+ page_direction_increment(ptr, block->frame, NULL,
+ PAGE_LEFT);
} else {
goto no_direction;
}
}
- page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
+ page_header_set_ptr(block->frame, NULL, PAGE_LAST_INSERT, insert_rec);
/* 7. It remains to update the owner record. */
{
rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
ulint n_owned;
- if (page_is_comp(page)) {
+ if (page_is_comp(block->frame)) {
n_owned = rec_get_n_owned_new(owner_rec);
- rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
+ rec_set_bit_field_1(owner_rec, n_owned + 1,
+ REC_NEW_N_OWNED, REC_N_OWNED_MASK,
+ REC_N_OWNED_SHIFT);
} else {
n_owned = rec_get_n_owned_old(owner_rec);
- rec_set_n_owned_old(owner_rec, n_owned + 1);
+ rec_set_bit_field_1(owner_rec, n_owned + 1,
+ REC_OLD_N_OWNED, REC_N_OWNED_MASK,
+ REC_N_OWNED_SHIFT);
}
/* 8. Now we have incremented the n_owned field of the owner
@@ -1533,17 +1627,16 @@ no_direction:
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
- page_dir_split_slot(
- page, NULL,
- page_dir_find_owner_slot(owner_rec));
+ page_dir_split_slot<false>(
+ block,
+ page_dir_find_owner_slot(owner_rec), mtr);
}
}
/* 9. Write log record of the insert */
- if (UNIV_LIKELY(mtr != NULL)) {
- page_cur_insert_rec_write_log(insert_rec, rec_size,
- current_rec, index, mtr);
- }
+ mtr->set_log_mode(log_mode);
+ page_cur_insert_rec_write_log(insert_rec, rec_size,
+ current_rec, index, mtr);
return(insert_rec);
}
@@ -1903,8 +1996,10 @@ use_heap:
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
- rec_set_n_owned_new(insert_rec, NULL, 0);
- rec_set_heap_no_new(insert_rec, heap_no);
+ rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
+ rec_set_bit_field_2(insert_rec, heap_no, REC_NEW_HEAP_NO,
+ REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
@@ -1944,16 +2039,20 @@ no_direction:
ulint n_owned;
n_owned = rec_get_n_owned_new(owner_rec);
- rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
+ rec_set_bit_field_1(owner_rec, n_owned + 1, REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
/* 8. Now we have incremented the n_owned field of the owner
record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
- page_dir_split_slot(
- page, page_zip,
- page_dir_find_owner_slot(owner_rec));
+ const mtr_log_t log_mode = mtr->set_log_mode(
+ MTR_LOG_NONE);
+ page_dir_split_slot<true>(
+ page_cur_get_block(cursor),
+ page_dir_find_owner_slot(owner_rec), mtr);
+ mtr->set_log_mode(log_mode);
}
}
@@ -2045,12 +2144,13 @@ ATTRIBUTE_COLD /* only used when crash-upgrading */
void
page_copy_rec_list_end_to_created_page(
/*===================================*/
- page_t* new_page, /*!< in/out: index page to copy to */
+ buf_block_t* block, /*!< in/out: index page to copy to */
rec_t* rec, /*!< in: first record to copy */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mtr */
{
page_dir_slot_t* slot = 0; /* remove warning */
+ page_t* new_page = block->frame;
byte* heap_top;
rec_t* insert_rec = 0; /* remove warning */
rec_t* prev_rec;
@@ -2063,6 +2163,8 @@ page_copy_rec_list_end_to_created_page(
offset_t* offsets = offsets_;
rec_offs_init(offsets_);
+ /* The record was never emitted for ROW_FORMAT=COMPRESSED pages. */
+ ut_ad(!block->page.zip.data);
ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
ut_ad(page_align(rec) != new_page);
ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
@@ -2084,9 +2186,10 @@ page_copy_rec_list_end_to_created_page(
#ifdef UNIV_DEBUG
/* To pass the debug tests we have to set these dummy values
in the debug version */
- page_dir_set_n_slots(new_page, NULL, srv_page_size / 2);
- page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
- new_page + srv_page_size - 1);
+ mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page,
+ srv_page_size / 2);
+ mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + new_page,
+ srv_page_size - 1);
#endif
prev_rec = page_get_infimum_rec(new_page);
if (page_is_comp(new_page)) {
@@ -2105,22 +2208,21 @@ page_copy_rec_list_end_to_created_page(
ULINT_UNDEFINED, &heap);
insert_rec = rec_copy(heap_top, rec, offsets);
- if (page_is_comp(new_page)) {
+ const bool comp = page_is_comp(new_page) != 0;
+
+ if (comp) {
rec_set_next_offs_new(prev_rec,
page_offset(insert_rec));
-
- rec_set_n_owned_new(insert_rec, NULL, 0);
- rec_set_heap_no_new(insert_rec,
- PAGE_HEAP_NO_USER_LOW + n_recs);
} else {
rec_set_next_offs_old(prev_rec,
page_offset(insert_rec));
-
- rec_set_n_owned_old(insert_rec, 0);
- rec_set_heap_no_old(insert_rec,
- PAGE_HEAP_NO_USER_LOW + n_recs);
}
+ page_rec_set_n_owned<false>(block, insert_rec, 0, comp, mtr);
+
+ rec_set_heap_no(insert_rec, PAGE_HEAP_NO_USER_LOW + n_recs,
+ page_is_comp(new_page));
+
count++;
n_recs++;
@@ -2130,9 +2232,9 @@ page_copy_rec_list_end_to_created_page(
slot_index++;
slot = page_dir_get_nth_slot(new_page, slot_index);
-
- page_dir_slot_set_rec(slot, insert_rec);
- page_dir_slot_set_n_owned(slot, NULL, count);
+ mach_write_to_2(slot, page_offset(insert_rec));
+ page_dir_slot_set_n_owned<false>(block, slot, count,
+ mtr);
count = 0;
}
@@ -2164,7 +2266,7 @@ page_copy_rec_list_end_to_created_page(
count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
- page_dir_slot_set_n_owned(slot, NULL, 0);
+ page_dir_slot_set_n_owned<false>(block, slot, 0, mtr);
slot_index--;
}
@@ -2173,18 +2275,24 @@ page_copy_rec_list_end_to_created_page(
mem_heap_free(heap);
}
+ slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
+
if (page_is_comp(new_page)) {
rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
+ mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
+ rec_set_bit_field_1(new_page + PAGE_NEW_SUPREMUM, count + 1,
+ REC_NEW_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
} else {
rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
+ mach_write_to_2(slot, PAGE_OLD_SUPREMUM);
+ rec_set_bit_field_1(new_page + PAGE_OLD_SUPREMUM, count + 1,
+ REC_OLD_N_OWNED,
+ REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
}
- slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
-
- page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
- page_dir_slot_set_n_owned(slot, NULL, count + 1);
-
- page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
+ mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page,
+ 2 + slot_index);
page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
@@ -2196,40 +2304,9 @@ page_copy_rec_list_end_to_created_page(
}
/***********************************************************//**
-Writes log record of a record delete on a page. */
-UNIV_INLINE
-void
-page_cur_delete_rec_write_log(
-/*==========================*/
- rec_t* rec, /*!< in: record to be deleted */
- const dict_index_t* index, /*!< in: record descriptor */
- mtr_t* mtr) /*!< in: mini-transaction handle */
-{
- byte* log_ptr;
-
- ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
- ut_ad(mtr->is_named_space(index->table->space));
-
- log_ptr = mlog_open_and_write_index(mtr, rec, index,
- page_rec_is_comp(rec)
- ? MLOG_COMP_REC_DELETE
- : MLOG_REC_DELETE, 2);
-
- if (!log_ptr) {
- /* Logging in mtr is switched off during crash recovery:
- in that case mlog_open returns NULL */
- return;
- }
-
- /* Write the cursor rec offset as a 2-byte ulint */
- mach_write_to_2(log_ptr, page_offset(rec));
-
- mlog_close(mtr, log_ptr + 2);
-}
-
-/***********************************************************//**
Parses log record of a record delete on a page.
@return pointer to record end or NULL */
+ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_cur_parse_delete_rec(
/*======================*/
@@ -2283,33 +2360,40 @@ page_cur_parse_delete_rec(
}
/** Prepend a record to the PAGE_FREE list.
-@param[in,out] page index page
-@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL
-@param[in,out] rec record being deleted
-@param[in] index the index that the page belongs to
-@param[in] offsets rec_get_offsets(rec, index) */
-static void page_mem_free(page_t* page, page_zip_des_t* page_zip, rec_t* rec,
- const dict_index_t* index, const offset_t* offsets)
+@param[in,out] block index page
+@param[in,out] rec record being deleted
+@param[in] index the index that the page belongs to
+@param[in] offsets rec_get_offsets(rec, index)
+@param[in,out] mtr mini-transaction */
+static void page_mem_free(buf_block_t *block, rec_t *rec,
+ const dict_index_t *index, const offset_t *offsets,
+ mtr_t *mtr)
{
- ut_ad(rec_offs_validate(rec, index, offsets));
- const rec_t* free = page_header_get_ptr(page, PAGE_FREE);
-
- if (srv_immediate_scrub_data_uncompressed) {
- /* scrub record */
- memset(rec, 0, rec_offs_data_size(offsets));
- }
-
- page_rec_set_next(rec, free);
- page_header_set_ptr(page, page_zip, PAGE_FREE, rec);
- page_header_set_field(page, page_zip, PAGE_GARBAGE,
- rec_offs_size(offsets)
- + page_header_get_field(page, PAGE_GARBAGE));
- if (page_zip) {
- page_zip_dir_delete(page_zip, rec, index, offsets, free);
- } else {
- page_header_set_field(page, page_zip, PAGE_N_RECS,
- ulint(page_get_n_recs(page)) - 1);
- }
+ ut_ad(rec_offs_validate(rec, index, offsets));
+ ut_ad(page_align(rec) == block->frame);
+ const rec_t *free= page_header_get_ptr(block->frame, PAGE_FREE);
+
+ if (UNIV_LIKELY_NULL(block->page.zip.data))
+ page_zip_dir_delete(&block->page.zip, rec, index, offsets, free, mtr);
+ else
+ {
+ if (srv_immediate_scrub_data_uncompressed)
+ mtr->memset(block, page_offset(rec), rec_offs_data_size(offsets), 0);
+
+ uint16_t next= free
+ ? (page_is_comp(block->frame)
+ ? static_cast<uint16_t>(free - rec)
+ : static_cast<uint16_t>(page_offset(free)))
+ : 0;
+ mtr->write<2>(*block, rec - REC_NEXT, next);
+ mtr->write<2>(*block, PAGE_FREE + PAGE_HEADER + block->frame,
+ page_offset(rec));
+ mtr->write<2>(*block, PAGE_GARBAGE + PAGE_HEADER + block->frame,
+ rec_offs_size(offsets)
+ + page_header_get_field(block->frame, PAGE_GARBAGE));
+ mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame,
+ ulint(page_get_n_recs(block->frame)) - 1);
+ }
}
/***********************************************************//**
@@ -2356,12 +2440,15 @@ page_cur_delete_rec(
ut_ad(page_rec_is_user_rec(current_rec));
if (page_get_n_recs(block->frame) == 1
+#if 1 /* MDEV-12353 TODO: skip this for the physical log format */
+ /* Empty the page, unless we are applying the redo log
+ during crash recovery. During normal operation, the
+ page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
+ MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
&& !recv_recovery_is_on()
+#endif
&& !rec_is_alter_metadata(current_rec, *index)) {
- /* Empty the page, unless we are applying the redo log
- during crash recovery. During normal operation, the
- page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
- MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
+ /* Empty the page. */
ut_ad(page_is_leaf(block->frame));
/* Usually, this should be the root page,
and the whole index tree should become empty.
@@ -2383,10 +2470,7 @@ page_cur_delete_rec(
/* 1. Reset the last insert info in the page header and increment
the modify clock for the frame */
-
- page_zip_des_t* const page_zip = buf_block_get_page_zip(block);
-
- page_header_set_ptr(block->frame, page_zip, PAGE_LAST_INSERT, NULL);
+ page_header_reset_last_insert(block, mtr);
/* The page gets invalid for btr_pcur_restore_pos().
We avoid invoking buf_block_modify_clock_inc(block) because its
@@ -2394,8 +2478,6 @@ page_cur_delete_rec(
used during IMPORT TABLESPACE. */
block->modify_clock++;
- page_cur_delete_rec_write_log(current_rec, index, mtr);
-
/* 2. Find the next and the previous record. Note that the cursor is
left at the next record. */
@@ -2416,34 +2498,72 @@ page_cur_delete_rec(
next_rec = cursor->rec;
/* 3. Remove the record from the linked list of records */
-
- page_rec_set_next(prev_rec, next_rec);
-
/* 4. If the deleted record is pointed to by a dir slot, update the
record pointer in slot. In the following if-clause we assume that
prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
>= 2. */
+ /* 5. Update the number of owned records of the slot */
compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
ut_ad(cur_n_owned > 1);
- if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
- page_dir_slot_set_rec(cur_dir_slot, prev_rec);
- }
+ rec_t* slot_rec = const_cast<rec_t*>
+ (page_dir_slot_get_rec(cur_dir_slot));
+
+ if (UNIV_LIKELY_NULL(block->page.zip.data)) {
+ ut_ad(page_is_comp(block->frame));
+ if (current_rec == slot_rec) {
+ page_zip_rec_set_owned(block, prev_rec, 1, mtr);
+ page_zip_rec_set_owned(block, slot_rec, 0, mtr);
+ slot_rec = prev_rec;
+ mach_write_to_2(cur_dir_slot, page_offset(slot_rec));
+ } else if (cur_n_owned == 1
+ && !page_rec_is_supremum(slot_rec)) {
+ page_zip_rec_set_owned(block, slot_rec, 0, mtr);
+ }
- /* 5. Update the number of owned records of the slot */
+ mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
+ (next_rec - prev_rec));
+ mach_write_to_1(slot_rec - REC_NEW_N_OWNED,
+ (slot_rec[-REC_NEW_N_OWNED]
+ & ~REC_N_OWNED_MASK)
+ | (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
+ } else {
+ if (current_rec == slot_rec) {
+ slot_rec = prev_rec;
+ mtr->write<2>(*block, cur_dir_slot,
+ page_offset(slot_rec));
+ }
- page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
+ if (page_is_comp(block->frame)) {
+ mtr->write<2>(*block, prev_rec - REC_NEXT,
+ static_cast<uint16_t>
+ (next_rec - prev_rec));
+ mtr->write<1>(*block, slot_rec - REC_NEW_N_OWNED,
+ (slot_rec[-REC_NEW_N_OWNED]
+ & ~REC_N_OWNED_MASK)
+ | (cur_n_owned - 1)
+ << REC_N_OWNED_SHIFT);
+ } else {
+ mtr->write<2>(*block, prev_rec - REC_NEXT,
+ page_offset(next_rec));
+ mtr->write<1>(*block, slot_rec - REC_OLD_N_OWNED,
+ (slot_rec[-REC_OLD_N_OWNED]
+ & ~REC_N_OWNED_MASK)
+ | (cur_n_owned - 1)
+ << REC_N_OWNED_SHIFT);
+ }
+ }
/* 6. Free the memory occupied by the record */
- page_mem_free(block->frame, page_zip, current_rec, index, offsets);
+ page_mem_free(block, current_rec, index, offsets, mtr);
/* 7. Now we have decremented the number of owned records of the slot.
If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
slots. */
if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
- page_dir_balance_slot(block->frame, page_zip, cur_slot_no);
+ page_dir_balance_slot(block, cur_slot_no, mtr);
}
}
diff --git a/storage/innobase/page/page0page.cc b/storage/innobase/page/page0page.cc
index 842f1960c7d..75514f173b1 100644
--- a/storage/innobase/page/page0page.cc
+++ b/storage/innobase/page/page0page.cc
@@ -843,34 +843,9 @@ zip_reorganize:
}
/**********************************************************//**
-Writes a log record of a record list end or start deletion. */
-UNIV_INLINE
-void
-page_delete_rec_list_write_log(
-/*===========================*/
- rec_t* rec, /*!< in: record on page */
- dict_index_t* index, /*!< in: record descriptor */
- mlog_id_t type, /*!< in: operation type:
- MLOG_LIST_END_DELETE, ... */
- mtr_t* mtr) /*!< in: mtr */
-{
- byte* log_ptr;
- ut_ad(type == MLOG_LIST_END_DELETE
- || type == MLOG_LIST_START_DELETE
- || type == MLOG_COMP_LIST_END_DELETE
- || type == MLOG_COMP_LIST_START_DELETE);
-
- log_ptr = mlog_open_and_write_index(mtr, rec, index, type, 2);
- if (log_ptr) {
- /* Write the parameter as a 2-byte ulint */
- mach_write_to_2(log_ptr, page_offset(rec));
- mlog_close(mtr, log_ptr + 2);
- }
-}
-
-/**********************************************************//**
Parses a log record of a record list end or start deletion.
@return end of log record or NULL */
+ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_parse_delete_rec_list(
/*=======================*/
@@ -993,29 +968,20 @@ delete_all:
}
}
- /* Reset the last insert info in the page header and increment
- the modify clock for the frame */
-
- page_header_set_ptr(block->frame, page_zip, PAGE_LAST_INSERT, NULL);
-
/* The page gets invalid for optimistic searches: increment the
frame modify clock */
buf_block_modify_clock_inc(block);
- page_delete_rec_list_write_log(rec, index, page_is_comp(block->frame)
- ? MLOG_COMP_LIST_END_DELETE
- : MLOG_LIST_END_DELETE, mtr);
-
const bool is_leaf = page_is_leaf(block->frame);
+ byte* last_insert = my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER
+ + block->frame);
- if (page_zip) {
- mtr_log_t log_mode;
-
+ if (UNIV_LIKELY_NULL(page_zip)) {
ut_ad(page_is_comp(block->frame));
- /* Individual deletes are not logged */
- log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
+ memset(last_insert, 0, 2);
+ page_zip_write_header(page_zip, last_insert, 2, mtr);
do {
page_cur_t cur;
@@ -1034,12 +1000,11 @@ delete_all:
mem_heap_free(heap);
}
- /* Restore log mode */
-
- mtr_set_log_mode(mtr, log_mode);
return;
}
+ mtr->write<2,mtr_t::OPT>(*block, last_insert, 0U);
+
prev_rec = page_rec_get_prev(rec);
last_rec = page_rec_get_prev(page_get_supremum_rec(block->frame));
@@ -1100,6 +1065,20 @@ delete_all:
slot_index = page_dir_find_owner_slot(rec2);
ut_ad(slot_index > 0);
slot = page_dir_get_nth_slot(block->frame, slot_index);
+ mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_NEW_SUPREMUM);
+ byte* owned = PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED
+ + block->frame;
+ byte new_owned = (*owned & ~REC_N_OWNED_MASK)
+ | static_cast<byte>(n_owned << REC_N_OWNED_SHIFT);
+ mtr->write<1,mtr_t::OPT>(*block, owned, new_owned);
+ mtr->write<2>(*block, prev_rec - REC_NEXT,
+ static_cast<uint16_t>
+ (PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
+ uint16_t free = page_header_get_field(block->frame, PAGE_FREE);
+ mtr->write<2>(*block, last_rec - REC_NEXT, free
+ ? static_cast<uint16_t>
+ (free - page_offset(last_rec))
+ : 0U);
} else {
rec_t* rec2 = rec;
ulint count = 0;
@@ -1116,29 +1095,32 @@ delete_all:
slot_index = page_dir_find_owner_slot(rec2);
ut_ad(slot_index > 0);
slot = page_dir_get_nth_slot(block->frame, slot_index);
+ mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_OLD_SUPREMUM);
+ byte* owned = PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED
+ + block->frame;
+ byte new_owned = (*owned & ~REC_N_OWNED_MASK)
+ | static_cast<byte>(n_owned << REC_N_OWNED_SHIFT);
+ mtr->write<1,mtr_t::OPT>(*block, owned, new_owned);
+ mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM);
+ mtr->write<2>(*block, last_rec - REC_NEXT,
+ page_header_get_field(block->frame, PAGE_FREE));
}
- page_dir_slot_set_rec(slot, page_get_supremum_rec(block->frame));
- page_dir_slot_set_n_owned(slot, NULL, n_owned);
-
- page_dir_set_n_slots(block->frame, NULL, slot_index + 1);
-
- /* Remove the record chain segment from the record chain */
- page_rec_set_next(prev_rec, page_get_supremum_rec(block->frame));
+ mtr->write<2,mtr_t::OPT>(*block, PAGE_N_DIR_SLOTS + PAGE_HEADER
+ + block->frame, slot_index + 1);
/* Catenate the deleted chain segment to the page free list */
- page_rec_set_next(last_rec, page_header_get_ptr(block->frame,
- PAGE_FREE));
- page_header_set_ptr(block->frame, NULL, PAGE_FREE, rec);
+ mtr->write<2>(*block, PAGE_FREE + PAGE_HEADER + block->frame,
+ page_offset(rec));
+ byte* garbage = my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER
+ + block->frame);
+ mtr->write<2>(*block, garbage, size + mach_read_from_2(garbage));
- page_header_set_field(block->frame, NULL, PAGE_GARBAGE, size
- + page_header_get_field(block->frame,
- PAGE_GARBAGE));
-
- ut_ad(page_get_n_recs(block->frame) > n_recs);
- page_header_set_field(block->frame, NULL, PAGE_N_RECS,
- ulint{page_get_n_recs(block->frame) - n_recs});
+ byte* page_n_recs = my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER
+ + block->frame);
+ mtr->write<2>(*block, page_n_recs,
+ ulint{mach_read_from_2(page_n_recs)} - n_recs);
}
/*************************************************************//**
@@ -1187,22 +1169,9 @@ page_delete_rec_list_start(
return;
}
- mlog_id_t type;
-
- if (page_rec_is_comp(rec)) {
- type = MLOG_COMP_LIST_START_DELETE;
- } else {
- type = MLOG_LIST_START_DELETE;
- }
-
- page_delete_rec_list_write_log(rec, index, type, mtr);
-
page_cur_set_before_first(block, &cur1);
page_cur_move_to_next(&cur1);
- /* Individual deletes are not logged */
-
- mtr_log_t log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
const bool is_leaf = page_rec_is_leaf(rec);
while (page_cur_get_rec(&cur1) != rec) {
@@ -1215,10 +1184,6 @@ page_delete_rec_list_start(
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
-
- /* Restore log mode */
-
- mtr_set_log_mode(mtr, log_mode);
}
/*************************************************************//**
diff --git a/storage/innobase/page/page0zip.cc b/storage/innobase/page/page0zip.cc
index c8201a60966..37f14ac7b5a 100644
--- a/storage/innobase/page/page0zip.cc
+++ b/storage/innobase/page/page0zip.cc
@@ -4223,7 +4223,8 @@ page_zip_clear_rec(
page_zip_des_t* page_zip, /*!< in/out: compressed page */
byte* rec, /*!< in: record to clear */
const dict_index_t* index, /*!< in: index of rec */
- const offset_t* offsets) /*!< in: rec_get_offsets(rec, index) */
+ const offset_t* offsets, /*!< in: rec_get_offsets(rec, index) */
+ mtr_t* mtr) /*!< in/out: mini-transaction */
{
ulint heap_no;
page_t* page = page_align(rec);
@@ -4256,11 +4257,20 @@ page_zip_clear_rec(
rec_offs_n_fields(offsets) - 1,
&len);
ut_ad(len == REC_NODE_PTR_SIZE);
-
ut_ad(!rec_offs_any_extern(offsets));
memset(field, 0, REC_NODE_PTR_SIZE);
- memset(storage - (heap_no - 1) * REC_NODE_PTR_SIZE,
- 0, REC_NODE_PTR_SIZE);
+ storage -= (heap_no - 1) * REC_NODE_PTR_SIZE;
+clear_page_zip:
+ /* TODO: write MEMSET record */
+ memset(storage, 0, len);
+ if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + len)) {
+ log_ptr = mlog_write_initial_log_record_fast(
+ rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr);
+ mach_write_to_2(log_ptr, storage - page_zip->data);
+ mach_write_to_2(log_ptr + 2, len);
+ memcpy(log_ptr + 4, storage, len);
+ mlog_close(mtr, log_ptr + 4 + len);
+ }
} else if (dict_index_is_clust(index)) {
/* Clear trx_id and roll_ptr. On the compressed page,
there is an array of these fields immediately before the
@@ -4269,14 +4279,9 @@ page_zip_clear_rec(
= dict_col_get_clust_pos(
dict_table_get_sys_col(
index->table, DATA_TRX_ID), index);
- storage = page_zip_dir_start(page_zip);
field = rec_get_nth_field(rec, offsets, trx_id_pos, &len);
ut_ad(len == DATA_TRX_ID_LEN);
-
memset(field, 0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
- memset(storage - (heap_no - 1)
- * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN),
- 0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
if (rec_offs_any_extern(offsets)) {
ulint i;
@@ -4295,6 +4300,12 @@ page_zip_clear_rec(
}
}
}
+
+ len = DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
+ storage = page_zip_dir_start(page_zip)
+ - (heap_no - 1)
+ * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
+ goto clear_page_zip;
} else {
ut_ad(!rec_offs_any_extern(offsets));
}
@@ -4338,18 +4349,33 @@ must already have been written on the uncompressed page. */
void
page_zip_rec_set_owned(
/*===================*/
- page_zip_des_t* page_zip,/*!< in/out: compressed page */
+ buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */
const byte* rec, /*!< in: record on the uncompressed page */
- ulint flag) /*!< in: the owned flag (nonzero=TRUE) */
+ ulint flag, /*!< in: the owned flag (nonzero=TRUE) */
+ mtr_t* mtr) /*!< in/out: mini-transaction */
{
+ ut_ad(page_align(rec) == block->frame);
+ page_zip_des_t* const page_zip = &block->page.zip;
byte* slot = page_zip_dir_find(page_zip, page_offset(rec));
ut_a(slot);
UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
+ const byte b = *slot;
if (flag) {
*slot |= (PAGE_ZIP_DIR_SLOT_OWNED >> 8);
} else {
*slot &= ~(PAGE_ZIP_DIR_SLOT_OWNED >> 8);
}
+ if (b == *slot) {
+ } else if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + 1)) {
+ log_ptr = mlog_write_initial_log_record_low(
+ MLOG_ZIP_WRITE_STRING,
+ block->page.id.space(), block->page.id.page_no(),
+ log_ptr, mtr);
+ mach_write_to_2(log_ptr, slot - page_zip->data);
+ mach_write_to_2(log_ptr + 2, 1);
+ log_ptr[4] = *slot;
+ mlog_close(mtr, log_ptr + 5);
+ }
}
/**********************************************************************//**
@@ -4442,12 +4468,12 @@ page_zip_dir_delete(
byte* rec, /*!< in: deleted record */
const dict_index_t* index, /*!< in: index of rec */
const offset_t* offsets, /*!< in: rec_get_offsets(rec) */
- const byte* free) /*!< in: previous start of
+ const byte* free, /*!< in: previous start of
the free list */
+ mtr_t* mtr) /*!< in/out: mini-transaction */
{
byte* slot_rec;
byte* slot_free;
- ulint n_ext;
page_t* page = page_align(rec);
ut_ad(rec_offs_validate(rec, index, offsets));
@@ -4458,6 +4484,15 @@ page_zip_dir_delete(
UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
rec_offs_extra_size(offsets));
+ mach_write_to_2(rec - REC_NEXT, free
+ ? static_cast<uint16_t>(free - rec) : 0);
+ mach_write_to_2(PAGE_FREE + PAGE_HEADER + page, page_offset(rec));
+ byte* garbage = PAGE_GARBAGE + PAGE_HEADER + page;
+ mach_write_to_2(garbage, rec_offs_size(offsets)
+ + mach_read_from_2(garbage));
+ compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2);
+ page_zip_write_header(page_zip, PAGE_FREE + PAGE_HEADER + page,
+ 4, mtr);
slot_rec = page_zip_dir_find(page_zip, page_offset(rec));
ut_a(slot_rec);
@@ -4465,8 +4500,9 @@ page_zip_dir_delete(
ut_ad(n_recs);
ut_ad(n_recs > 1 || page_get_page_no(page) == index->page);
/* This could not be done before page_zip_dir_find(). */
- page_header_set_field(page, page_zip, PAGE_N_RECS,
- n_recs - 1);
+ mach_write_to_2(PAGE_N_RECS + PAGE_HEADER + page, n_recs - 1);
+ page_zip_write_header(page_zip, PAGE_N_RECS + PAGE_HEADER + page,
+ 2, mtr);
if (UNIV_UNLIKELY(!free)) {
/* Make the last slot the start of the free list. */
@@ -4482,22 +4518,34 @@ page_zip_dir_delete(
slot_free += PAGE_ZIP_DIR_SLOT_SIZE;
}
- if (UNIV_LIKELY(slot_rec > slot_free)) {
+ const ulint slot_len = slot_rec > slot_free
+ ? ulint(slot_rec - slot_free)
+ : 0;
+ if (slot_len) {
memmove_aligned<2>(slot_free + PAGE_ZIP_DIR_SLOT_SIZE,
- slot_free, ulint(slot_rec - slot_free));
+ slot_free, slot_len);
+ /* TODO: issue MEMMOVE record to reduce log volume */
}
/* Write the entry for the deleted record.
The "owned" and "deleted" flags will be cleared. */
mach_write_to_2(slot_free, page_offset(rec));
- if (!page_is_leaf(page) || !dict_index_is_clust(index)) {
- ut_ad(!rec_offs_any_extern(offsets));
- goto skip_blobs;
+ if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) {
+ log_ptr = mlog_write_initial_log_record_fast(
+ rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr);
+ mach_write_to_2(log_ptr, slot_free - page_zip->data);
+ mach_write_to_2(log_ptr + 2, slot_len
+ + PAGE_ZIP_DIR_SLOT_SIZE);
+ mlog_close(mtr, log_ptr + 4);
+ mlog_catenate_string(mtr, slot_free, slot_len
+ + PAGE_ZIP_DIR_SLOT_SIZE);
}
- n_ext = rec_offs_n_extern(offsets);
- if (UNIV_UNLIKELY(n_ext != 0)) {
+ if (const ulint n_ext = rec_offs_n_extern(offsets)) {
+ ut_ad(index->is_primary());
+ ut_ad(page_is_leaf(page));
+
/* Shift and zero fill the array of BLOB pointers. */
ulint blob_no;
byte* externs;
@@ -4510,24 +4558,34 @@ page_zip_dir_delete(
- (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)
* PAGE_ZIP_CLUST_LEAF_SLOT_SIZE;
- ext_end = externs - page_zip->n_blobs
- * BTR_EXTERN_FIELD_REF_SIZE;
- externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
+ ext_end = externs - page_zip->n_blobs * FIELD_REF_SIZE;
- page_zip->n_blobs -= static_cast<unsigned>(n_ext);
/* Shift and zero fill the array. */
- memmove(ext_end + n_ext * BTR_EXTERN_FIELD_REF_SIZE, ext_end,
- ulint(page_zip->n_blobs - blob_no)
+ memmove(ext_end + n_ext * FIELD_REF_SIZE, ext_end,
+ ulint(page_zip->n_blobs - n_ext - blob_no)
* BTR_EXTERN_FIELD_REF_SIZE);
- memset(ext_end, 0, n_ext * BTR_EXTERN_FIELD_REF_SIZE);
+ memset(ext_end, 0, n_ext * FIELD_REF_SIZE);
+ /* TODO: use MEMMOVE and MEMSET records to reduce volume */
+ const ulint ext_len = ulint(page_zip->n_blobs - blob_no)
+ * FIELD_REF_SIZE;
+
+ if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) {
+ log_ptr = mlog_write_initial_log_record_fast(
+ rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr);
+ mach_write_to_2(log_ptr, ext_end - page_zip->data);
+ mach_write_to_2(log_ptr + 2, ext_len);
+ mlog_close(mtr, log_ptr + 4);
+ mlog_catenate_string(mtr, ext_end, ext_len);
+ }
+
+ page_zip->n_blobs -= static_cast<unsigned>(n_ext);
}
-skip_blobs:
/* The compression algorithm expects info_bits and n_owned
to be 0 for deleted records. */
rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */
- page_zip_clear_rec(page_zip, rec, index, offsets);
+ page_zip_clear_rec(page_zip, rec, index, offsets, mtr);
}
/**********************************************************************//**
diff --git a/storage/innobase/rem/rem0rec.cc b/storage/innobase/rem/rem0rec.cc
index 613dd2d6a81..69ee50a661c 100644
--- a/storage/innobase/rem/rem0rec.cc
+++ b/storage/innobase/rem/rem0rec.cc
@@ -1416,7 +1416,8 @@ rec_convert_dtuple_to_rec_old(
/* Set the info bits of the record */
rec_set_info_bits_old(rec, dtuple_get_info_bits(dtuple)
& REC_INFO_BITS_MASK);
- rec_set_heap_no_old(rec, PAGE_HEAP_NO_USER_LOW);
+ rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW, REC_OLD_HEAP_NO,
+ REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
/* Store the data and the offsets */
@@ -1529,7 +1530,9 @@ rec_convert_dtuple_to_rec_comp(
ut_ad(n_fields == ulint(index->n_fields) + 1);
rec_set_n_add_field(nulls, n_fields - 1
- index->n_core_fields);
- rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW);
+ rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW,
+ REC_NEW_HEAP_NO, REC_HEAP_NO_MASK,
+ REC_HEAP_NO_SHIFT);
rec_set_status(rec, REC_STATUS_INSTANT);
n_node_ptr_field = ULINT_UNDEFINED;
lens = nulls - UT_BITS_IN_BYTES(index->n_nullable);
@@ -1545,8 +1548,9 @@ rec_convert_dtuple_to_rec_comp(
case REC_STATUS_ORDINARY:
ut_ad(n_fields <= dict_index_get_n_fields(index));
if (!temp) {
- rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW);
-
+ rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW,
+ REC_NEW_HEAP_NO, REC_HEAP_NO_MASK,
+ REC_HEAP_NO_SHIFT);
rec_set_status(
rec, n_fields == index->n_core_fields
? REC_STATUS_ORDINARY
@@ -1569,7 +1573,9 @@ rec_convert_dtuple_to_rec_comp(
break;
case REC_STATUS_NODE_PTR:
ut_ad(!temp);
- rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW);
+ rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW,
+ REC_NEW_HEAP_NO, REC_HEAP_NO_MASK,
+ REC_HEAP_NO_SHIFT);
rec_set_status(rec, status);
ut_ad(n_fields
== dict_index_get_n_unique_in_tree_nonleaf(index) + 1);