summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChenhao Qu <chenhao.qu@mongodb.com>2020-08-12 06:51:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-12 07:06:45 +0000
commitb7d2e619b374113283e6bd7f67fb4da9883b5f21 (patch)
treeb9143220d3769ec6f49e173c7c4600b85f8b2095
parent33e0dff5b5baadb7373f65c1c74aa94fab49bcd1 (diff)
downloadmongo-b7d2e619b374113283e6bd7f67fb4da9883b5f21.tar.gz
Import wiredtiger: 0c8bd1b18c65ec035684f6ee7a296b971826399e from branch mongodb-4.6
ref: 36194b4a03..0c8bd1b18c for: 4.5.1 WT-6458 read row-store leaf pages with prepared updates in a single pass WT-6570 RTS to remove the left over updates in the history store without stop timestamp
-rw-r--r--src/third_party/wiredtiger/import.data2
-rw-r--r--src/third_party/wiredtiger/src/btree/bt_page.c136
-rw-r--r--src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c59
-rwxr-xr-xsrc/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py129
4 files changed, 239 insertions, 87 deletions
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data
index 8a65c35ba47..9e3b277f846 100644
--- a/src/third_party/wiredtiger/import.data
+++ b/src/third_party/wiredtiger/import.data
@@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger.git",
"branch": "mongodb-4.6",
- "commit": "36194b4a0341d4b22349510c6a09ab82757f266c"
+ "commit": "0c8bd1b18c65ec035684f6ee7a296b971826399e"
}
diff --git a/src/third_party/wiredtiger/src/btree/bt_page.c b/src/third_party/wiredtiger/src/btree/bt_page.c
index 7a325ffe3eb..5345b9b49d0 100644
--- a/src/third_party/wiredtiger/src/btree/bt_page.c
+++ b/src/third_party/wiredtiger/src/btree/bt_page.c
@@ -531,32 +531,36 @@ __inmem_row_leaf_entries(WT_SESSION_IMPL *session, const WT_PAGE_HEADER *dsk, ui
static int
__inmem_row_leaf(WT_SESSION_IMPL *session, WT_PAGE *page)
{
+ enum { PREPARE_INSTANTIATE, PREPARE_INITIALIZED, PREPARE_IGNORE } prepare;
WT_BTREE *btree;
WT_CELL_UNPACK_KV unpack;
WT_DECL_ITEM(value);
WT_DECL_RET;
WT_ROW *rip;
- WT_UPDATE *tombstone, *upd, **upd_array;
+ WT_UPDATE *tombstone, *upd;
size_t size, total_size;
- uint32_t i;
- bool instantiate_prepared, prepare;
btree = S2BT(session);
tombstone = upd = NULL;
- prepare = false;
+ size = total_size = 0;
- instantiate_prepared = F_ISSET(session, WT_SESSION_INSTANTIATE_PREPARE);
+ /*
+ * Optionally instantiate prepared updates. In-memory databases restore non-obsolete updates on
+ * the page as part of the __split_multi_inmem function.
+ */
+ prepare = F_ISSET(session, WT_SESSION_INSTANTIATE_PREPARE) &&
+ !F_ISSET(S2C(session), WT_CONN_IN_MEMORY) ?
+ PREPARE_INSTANTIATE :
+ PREPARE_IGNORE;
/* Walk the page, building indices. */
rip = page->pg_row;
WT_CELL_FOREACH_KV (session, page->dsk, unpack) {
- if (instantiate_prepared && !prepare && unpack.tw.prepare)
- prepare = true;
switch (unpack.type) {
case WT_CELL_KEY_OVFL:
__wt_row_leaf_key_set_cell(page, rip, unpack.cell);
++rip;
- break;
+ continue;
case WT_CELL_KEY:
/*
* Simple keys without compression (not Huffman encoded or prefix compressed), can be
@@ -567,7 +571,7 @@ __inmem_row_leaf(WT_SESSION_IMPL *session, WT_PAGE *page)
else
__wt_row_leaf_key_set_cell(page, rip, unpack.cell);
++rip;
- break;
+ continue;
case WT_CELL_VALUE:
/*
* Simple values without compression can be directly referenced on the page to avoid
@@ -584,72 +588,64 @@ __inmem_row_leaf(WT_SESSION_IMPL *session, WT_PAGE *page)
case WT_CELL_VALUE_OVFL:
break;
default:
- return (__wt_illegal_value(session, unpack.type));
+ WT_ERR(__wt_illegal_value(session, unpack.type));
}
- }
- WT_CELL_FOREACH_END;
- /*
- * Instantiate prepared updates on leaf pages when the page is loaded. For in-memory databases,
- * all non obsolete updates will retain on the page as part of __split_multi_inmem function.
- */
- if (prepare && !F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) {
- WT_RET(__wt_page_modify_init(session, page));
- if (!F_ISSET(btree, WT_BTREE_READONLY))
- __wt_page_modify_set(session, page);
-
- /* Allocate the per-page update array if one doesn't already exist. */
- if (page->entries != 0 && page->modify->mod_row_update == NULL)
- WT_PAGE_ALLOC_AND_SWAP(
- session, page, page->modify->mod_row_update, upd_array, page->entries);
-
- /* For each entry in the page */
- size = total_size = 0;
- upd_array = page->modify->mod_row_update;
- WT_ROW_FOREACH (page, rip, i) {
- /* Unpack the on-page value cell. */
- __wt_row_leaf_value_cell(session, page, rip, NULL, &unpack);
- if (unpack.tw.prepare) {
- /* Take the value from the original page cell. */
- if (value == NULL)
- WT_ERR(__wt_scr_alloc(session, 0, &value));
- WT_ERR(__wt_page_cell_data_ref(session, page, &unpack, value));
-
- WT_ERR(__wt_upd_alloc(session, value, WT_UPDATE_STANDARD, &upd, &size));
- total_size += size;
- upd->durable_ts = unpack.tw.durable_start_ts;
- upd->start_ts = unpack.tw.start_ts;
- upd->txnid = unpack.tw.start_txn;
-
- /*
- * Instantiating both update and tombstone if the prepared update is a tombstone.
- * This is required to ensure that written prepared delete operation must be removed
- * from the data store, when the prepared transaction gets rollback.
- */
- if (WT_TIME_WINDOW_HAS_STOP(&unpack.tw)) {
- WT_ERR(__wt_upd_alloc_tombstone(session, &tombstone, &size));
- total_size += size;
- tombstone->durable_ts = WT_TS_NONE;
- tombstone->start_ts = unpack.tw.stop_ts;
- tombstone->txnid = unpack.tw.stop_txn;
- tombstone->prepare_state = WT_PREPARE_INPROGRESS;
- F_SET(tombstone, WT_UPDATE_PREPARE_RESTORED_FROM_DS);
- F_SET(upd, WT_UPDATE_RESTORED_FROM_DS);
- tombstone->next = upd;
- } else {
- upd->durable_ts = WT_TS_NONE;
- upd->prepare_state = WT_PREPARE_INPROGRESS;
- F_SET(upd, WT_UPDATE_PREPARE_RESTORED_FROM_DS);
- tombstone = upd;
- }
-
- upd_array[WT_ROW_SLOT(page, rip)] = tombstone;
- tombstone = upd = NULL;
- }
+ if (!unpack.tw.prepare || prepare == PREPARE_IGNORE)
+ continue;
+
+ /* First prepared transaction setup. */
+ if (prepare == PREPARE_INSTANTIATE) {
+ WT_ERR(__wt_page_modify_init(session, page));
+ if (!F_ISSET(btree, WT_BTREE_READONLY))
+ __wt_page_modify_set(session, page);
+
+ /* Allocate the per-page update array. */
+ WT_ERR(__wt_calloc_def(session, page->entries, &page->modify->mod_row_update));
+ total_size += page->entries * sizeof(*page->modify->mod_row_update);
+
+ WT_ERR(__wt_scr_alloc(session, 0, &value));
+
+ prepare = PREPARE_INITIALIZED;
+ }
+
+ /* Take the value from the page cell. */
+ WT_ERR(__wt_page_cell_data_ref(session, page, &unpack, value));
+
+ WT_ERR(__wt_upd_alloc(session, value, WT_UPDATE_STANDARD, &upd, &size));
+ total_size += size;
+ upd->durable_ts = unpack.tw.durable_start_ts;
+ upd->start_ts = unpack.tw.start_ts;
+ upd->txnid = unpack.tw.start_txn;
+
+ /*
+ * Instantiate both update and tombstone if the prepared update is a tombstone. This is
+ * required to ensure that written prepared delete operation must be removed from the data
+ * store, when the prepared transaction gets rollback.
+ */
+ if (WT_TIME_WINDOW_HAS_STOP(&unpack.tw)) {
+ WT_ERR(__wt_upd_alloc_tombstone(session, &tombstone, &size));
+ total_size += size;
+ tombstone->durable_ts = WT_TS_NONE;
+ tombstone->start_ts = unpack.tw.stop_ts;
+ tombstone->txnid = unpack.tw.stop_txn;
+ tombstone->prepare_state = WT_PREPARE_INPROGRESS;
+ F_SET(tombstone, WT_UPDATE_PREPARE_RESTORED_FROM_DS);
+ F_SET(upd, WT_UPDATE_RESTORED_FROM_DS);
+ tombstone->next = upd;
+ } else {
+ upd->durable_ts = WT_TS_NONE;
+ upd->prepare_state = WT_PREPARE_INPROGRESS;
+ F_SET(upd, WT_UPDATE_PREPARE_RESTORED_FROM_DS);
+ tombstone = upd;
}
- __wt_cache_page_inmem_incr(session, page, total_size);
+ page->modify->mod_row_update[WT_ROW_SLOT(page, rip - 1)] = tombstone;
+ tombstone = upd = NULL;
}
+ WT_CELL_FOREACH_END;
+
+ __wt_cache_page_inmem_incr(session, page, total_size);
err:
__wt_free(session, tombstone);
diff --git a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c
index 2c97ddf48c7..764afc23f00 100644
--- a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c
+++ b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c
@@ -421,7 +421,12 @@ __rollback_abort_row_ondisk_kv(
__wt_row_leaf_value_cell(session, page, rip, NULL, vpack);
prepared = vpack->tw.prepare;
if (WT_IS_HS(S2BT(session))) {
- if (vpack->tw.durable_stop_ts > rollback_timestamp) {
+ /*
+ * Abort the history store update with stop durable timestamp greater than the stable
+ * timestamp or the updates with max stop timestamp which implies that they are associated
+ * with prepared transactions.
+ */
+ if (vpack->tw.durable_stop_ts > rollback_timestamp || vpack->tw.stop_ts == WT_TS_MAX) {
__wt_verbose(session, WT_VERB_RTS,
"hs update aborted with start durable/commit timestamp: %s, %s, "
"stop durable/commit timestamp: %s, %s and stable timestamp: %s",
@@ -696,6 +701,24 @@ __rollback_abort_newer_row_leaf(
}
/*
+ * __rollback_get_ref_max_durable_timestamp --
+ * Returns the ref aggregated max durable timestamp. The max durable timestamp is calculated
+ * between both start and stop durable timestamps except for history store, because most of the
+ * history store updates have stop timestamp either greater or equal to the start timestamp
+ * except for the updates written for the prepared updates on the data store. To abort the
+ * updates with no stop timestamp, we must include the newest stop timestamp also into the
+ * calculation of maximum durable timestamp of the history store.
+ */
+static wt_timestamp_t
+__rollback_get_ref_max_durable_timestamp(WT_SESSION_IMPL *session, WT_TIME_AGGREGATE *ta)
+{
+ if (WT_IS_HS(S2BT(session)))
+ return WT_MAX(ta->newest_stop_durable_ts, ta->newest_stop_ts);
+ else
+ return WT_MAX(ta->newest_start_durable_ts, ta->newest_stop_durable_ts);
+}
+
+/*
* __rollback_page_needs_abort --
* Check whether the page needs rollback. Return true if the page has modifications newer than
* the given timestamp Otherwise return false.
@@ -730,16 +753,15 @@ __rollback_page_needs_abort(
*/
if (mod != NULL && mod->rec_result == WT_PM_REC_REPLACE) {
tag = "reconciled replace block";
- durable_ts = WT_MAX(
- mod->mod_replace.ta.newest_start_durable_ts, mod->mod_replace.ta.newest_stop_durable_ts);
+ durable_ts = __rollback_get_ref_max_durable_timestamp(session, &mod->mod_replace.ta);
prepared = mod->mod_replace.ta.prepare;
result = (durable_ts > rollback_timestamp) || prepared;
} else if (mod != NULL && mod->rec_result == WT_PM_REC_MULTIBLOCK) {
tag = "reconciled multi block";
/* Calculate the max durable timestamp by traversing all multi addresses. */
for (multi = mod->mod_multi, i = 0; i < mod->mod_multi_entries; ++multi, ++i) {
- durable_ts = WT_MAX(durable_ts, multi->addr.ta.newest_start_durable_ts);
- durable_ts = WT_MAX(durable_ts, multi->addr.ta.newest_stop_durable_ts);
+ durable_ts = WT_MAX(
+ durable_ts, __rollback_get_ref_max_durable_timestamp(session, &multi->addr.ta));
if (multi->addr.ta.prepare)
prepared = true;
}
@@ -748,12 +770,12 @@ __rollback_page_needs_abort(
tag = "on page cell";
/* Check if the page is obsolete using the page disk address. */
__wt_cell_unpack_addr(session, ref->home->dsk, (WT_CELL *)addr, &vpack);
- durable_ts = WT_MAX(vpack.ta.newest_start_durable_ts, vpack.ta.newest_stop_durable_ts);
+ durable_ts = __rollback_get_ref_max_durable_timestamp(session, &vpack.ta);
prepared = vpack.ta.prepare;
result = (durable_ts > rollback_timestamp) || prepared;
} else if (addr != NULL) {
tag = "address";
- durable_ts = WT_MAX(addr->ta.newest_start_durable_ts, addr->ta.newest_stop_durable_ts);
+ durable_ts = __rollback_get_ref_max_durable_timestamp(session, &addr->ta);
prepared = addr->ta.prepare;
result = (durable_ts > rollback_timestamp) || prepared;
}
@@ -1064,7 +1086,7 @@ __rollback_to_stable_hs_final_pass(WT_SESSION_IMPL *session, wt_timestamp_t roll
WT_CONFIG ckptconf;
WT_CONFIG_ITEM cval, durableval, key;
WT_DECL_RET;
- wt_timestamp_t max_durable_ts, newest_start_durable_ts, newest_stop_durable_ts;
+ wt_timestamp_t max_durable_ts, newest_stop_durable_ts, newest_stop_ts;
char *config;
char ts_string[2][WT_TS_INT_STRING_SIZE];
@@ -1072,22 +1094,27 @@ __rollback_to_stable_hs_final_pass(WT_SESSION_IMPL *session, wt_timestamp_t roll
WT_RET(__wt_metadata_search(session, WT_HS_URI, &config));
- /* Find out the max durable timestamp of the object from checkpoint. */
- newest_start_durable_ts = newest_stop_durable_ts = WT_TS_NONE;
+ /*
+ * Find out the max durable timestamp of the history store from checkpoint. Most of the history
+ * store updates have stop timestamp either greater or equal to the start timestamp except for
+ * the updates written for the prepared updates on the data store. To abort the updates with no
+ * stop timestamp, we must include the newest stop timestamp also into the calculation of
+ * maximum timestamp of the history store.
+ */
+ newest_stop_durable_ts = newest_stop_ts = WT_TS_NONE;
WT_ERR(__wt_config_getones(session, config, "checkpoint", &cval));
__wt_config_subinit(session, &ckptconf, &cval);
for (; __wt_config_next(&ckptconf, &key, &cval) == 0;) {
- ret = __wt_config_subgets(session, &cval, "newest_start_durable_ts", &durableval);
- if (ret == 0)
- newest_start_durable_ts =
- WT_MAX(newest_start_durable_ts, (wt_timestamp_t)durableval.val);
- WT_ERR_NOTFOUND_OK(ret, false);
ret = __wt_config_subgets(session, &cval, "newest_stop_durable_ts", &durableval);
if (ret == 0)
newest_stop_durable_ts = WT_MAX(newest_stop_durable_ts, (wt_timestamp_t)durableval.val);
WT_ERR_NOTFOUND_OK(ret, false);
+ ret = __wt_config_subgets(session, &cval, "newest_stop_ts", &durableval);
+ if (ret == 0)
+ newest_stop_ts = WT_MAX(newest_stop_ts, (wt_timestamp_t)durableval.val);
+ WT_ERR_NOTFOUND_OK(ret, false);
}
- max_durable_ts = WT_MAX(newest_start_durable_ts, newest_stop_durable_ts);
+ max_durable_ts = WT_MAX(newest_stop_ts, newest_stop_durable_ts);
WT_ERR(__wt_session_get_dhandle(session, WT_HS_URI, NULL, NULL, 0));
/*
diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py
index e79c55203bf..567e51c8a65 100755
--- a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py
+++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py
@@ -155,6 +155,135 @@ class test_rollback_to_stable10(test_rollback_to_stable_base):
self.check(value_d, uri_1, nrows, 20)
# Check that the correct data is seen at and after the stable timestamp.
+ self.check(value_c, uri_2, nrows, 30)
+ self.check(value_a, uri_2, nrows, 50)
+ self.check(value_a, uri_2, nrows, 80)
+ self.check(value_b, uri_2, nrows, 40)
+ self.check(value_d, uri_2, nrows, 20)
+
+ stat_cursor = self.session.open_cursor('statistics:', None, None)
+ calls = stat_cursor[stat.conn.txn_rts][2]
+ hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2]
+ hs_sweep = stat_cursor[stat.conn.txn_rts_sweep_hs_keys][2]
+ keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2]
+ keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2]
+ pages_visited = stat_cursor[stat.conn.txn_rts_pages_visited][2]
+ upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2]
+ stat_cursor.close()
+
+ self.assertEqual(calls, 0)
+ self.assertEqual(keys_removed, 0)
+ self.assertEqual(keys_restored, 0)
+ self.assertGreaterEqual(upd_aborted, 0)
+ self.assertGreater(pages_visited, 0)
+ self.assertGreaterEqual(hs_removed, 0)
+ self.assertGreater(hs_sweep, 0)
+
+ def test_rollback_to_stable_prepare(self):
+ nrows = 1000
+
+ # Create a table without logging.
+ uri_1 = "table:rollback_to_stable10_1"
+ ds_1 = SimpleDataSet(
+ self, uri_1, 0, key_format="i", value_format="S", config='log=(enabled=false)')
+ ds_1.populate()
+
+ # Create another table without logging.
+ uri_2 = "table:rollback_to_stable10_2"
+ ds_2 = SimpleDataSet(
+ self, uri_2, 0, key_format="i", value_format="S", config='log=(enabled=false)')
+ ds_2.populate()
+
+ # Pin oldest and stable to timestamp 10.
+ self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(10) +
+ ',stable_timestamp=' + timestamp_str(10))
+
+ value_a = "aaaaa" * 100
+ value_b = "bbbbb" * 100
+ value_c = "ccccc" * 100
+ value_d = "ddddd" * 100
+ value_e = "eeeee" * 100
+ value_f = "fffff" * 100
+
+ # Perform several updates.
+ self.large_updates(uri_1, value_d, ds_1, nrows, 20)
+ self.large_updates(uri_1, value_c, ds_1, nrows, 30)
+ self.large_updates(uri_1, value_b, ds_1, nrows, 40)
+ self.large_updates(uri_1, value_a, ds_1, nrows, 50)
+
+ self.large_updates(uri_2, value_d, ds_2, nrows, 20)
+ self.large_updates(uri_2, value_c, ds_2, nrows, 30)
+ self.large_updates(uri_2, value_b, ds_2, nrows, 40)
+ self.large_updates(uri_2, value_a, ds_2, nrows, 50)
+
+ # Verify data is visible and correct.
+ self.check(value_d, uri_1, nrows, 20)
+ self.check(value_c, uri_1, nrows, 30)
+ self.session.breakpoint()
+ self.check(value_b, uri_1, nrows, 40)
+ self.check(value_a, uri_1, nrows, 50)
+
+ self.check(value_d, uri_2, nrows, 20)
+ self.check(value_c, uri_2, nrows, 30)
+ self.session.breakpoint()
+ self.check(value_b, uri_2, nrows, 40)
+ self.check(value_a, uri_2, nrows, 50)
+
+ # Pin stable to timestamp 60 if prepare otherwise 50.
+ if self.prepare:
+ self.conn.set_timestamp('stable_timestamp=' + timestamp_str(60))
+ else:
+ self.conn.set_timestamp('stable_timestamp=' + timestamp_str(50))
+
+ # Create a checkpoint thread
+ done = threading.Event()
+ ckpt = checkpoint_thread(self.conn, done)
+ ckpt.start()
+
+ # Perform several updates in parallel with checkpoint.
+ session_p1 = self.conn.open_session()
+ cursor_p1 = session_p1.open_cursor(uri_1)
+ session_p1.begin_transaction('isolation=snapshot')
+ for i in range(1, nrows):
+ cursor_p1.set_key(ds_1.key(i))
+ cursor_p1.set_value(value_e)
+ self.assertEquals(cursor_p1.update(), 0)
+ session_p1.prepare_transaction('prepare_timestamp=' + timestamp_str(69))
+
+ # Perform several updates in parallel with checkpoint.
+ session_p2 = self.conn.open_session()
+ cursor_p2 = session_p2.open_cursor(uri_2)
+ session_p2.begin_transaction('isolation=snapshot')
+ for i in range(1, nrows):
+ cursor_p2.set_key(ds_2.key(i))
+ cursor_p2.set_value(value_e)
+ self.assertEquals(cursor_p2.update(), 0)
+ session_p2.prepare_transaction('prepare_timestamp=' + timestamp_str(69))
+
+ done.set()
+ ckpt.join()
+
+ # Simulate a crash by copying to a new directory(RESTART).
+ copy_wiredtiger_home(".", "RESTART")
+
+ # Commit the prepared transaction.
+ session_p1.commit_transaction('commit_timestamp=' + timestamp_str(70) + ',durable_timestamp=' + timestamp_str(71))
+ session_p2.commit_transaction('commit_timestamp=' + timestamp_str(70) + ',durable_timestamp=' + timestamp_str(71))
+ session_p1.close()
+ session_p2.close()
+
+ # Open the new directory.
+ self.conn = self.setUpConnectionOpen("RESTART")
+ self.session = self.setUpSessionOpen(self.conn)
+
+ # Check that the correct data is seen at and after the stable timestamp.
+ self.check(value_a, uri_1, nrows, 50)
+ self.check(value_a, uri_1, nrows, 80)
+ self.check(value_b, uri_1, nrows, 40)
+ self.check(value_c, uri_1, nrows, 30)
+ self.check(value_d, uri_1, nrows, 20)
+
+ # Check that the correct data is seen at and after the stable timestamp.
self.check(value_a, uri_2, nrows, 50)
self.check(value_a, uri_2, nrows, 80)
self.check(value_b, uri_2, nrows, 40)