diff options
author | Chenhao Qu <chenhao.qu@mongodb.com> | 2020-08-12 06:51:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-12 07:06:45 +0000 |
commit | b7d2e619b374113283e6bd7f67fb4da9883b5f21 (patch) | |
tree | b9143220d3769ec6f49e173c7c4600b85f8b2095 /src/third_party | |
parent | 33e0dff5b5baadb7373f65c1c74aa94fab49bcd1 (diff) | |
download | mongo-b7d2e619b374113283e6bd7f67fb4da9883b5f21.tar.gz |
Import wiredtiger: 0c8bd1b18c65ec035684f6ee7a296b971826399e from branch mongodb-4.6
ref: 36194b4a03..0c8bd1b18c
for: 4.5.1
WT-6458 read row-store leaf pages with prepared updates in a single pass
WT-6570 RTS to remove the left over updates in the history store without stop timestamp
Diffstat (limited to 'src/third_party')
4 files changed, 239 insertions, 87 deletions
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 8a65c35ba47..9e3b277f846 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-4.6", - "commit": "36194b4a0341d4b22349510c6a09ab82757f266c" + "commit": "0c8bd1b18c65ec035684f6ee7a296b971826399e" } diff --git a/src/third_party/wiredtiger/src/btree/bt_page.c b/src/third_party/wiredtiger/src/btree/bt_page.c index 7a325ffe3eb..5345b9b49d0 100644 --- a/src/third_party/wiredtiger/src/btree/bt_page.c +++ b/src/third_party/wiredtiger/src/btree/bt_page.c @@ -531,32 +531,36 @@ __inmem_row_leaf_entries(WT_SESSION_IMPL *session, const WT_PAGE_HEADER *dsk, ui static int __inmem_row_leaf(WT_SESSION_IMPL *session, WT_PAGE *page) { + enum { PREPARE_INSTANTIATE, PREPARE_INITIALIZED, PREPARE_IGNORE } prepare; WT_BTREE *btree; WT_CELL_UNPACK_KV unpack; WT_DECL_ITEM(value); WT_DECL_RET; WT_ROW *rip; - WT_UPDATE *tombstone, *upd, **upd_array; + WT_UPDATE *tombstone, *upd; size_t size, total_size; - uint32_t i; - bool instantiate_prepared, prepare; btree = S2BT(session); tombstone = upd = NULL; - prepare = false; + size = total_size = 0; - instantiate_prepared = F_ISSET(session, WT_SESSION_INSTANTIATE_PREPARE); + /* + * Optionally instantiate prepared updates. In-memory databases restore non-obsolete updates on + * the page as part of the __split_multi_inmem function. + */ + prepare = F_ISSET(session, WT_SESSION_INSTANTIATE_PREPARE) && + !F_ISSET(S2C(session), WT_CONN_IN_MEMORY) ? + PREPARE_INSTANTIATE : + PREPARE_IGNORE; /* Walk the page, building indices. */ rip = page->pg_row; WT_CELL_FOREACH_KV (session, page->dsk, unpack) { - if (instantiate_prepared && !prepare && unpack.tw.prepare) - prepare = true; switch (unpack.type) { case WT_CELL_KEY_OVFL: __wt_row_leaf_key_set_cell(page, rip, unpack.cell); ++rip; - break; + continue; case WT_CELL_KEY: /* * Simple keys without compression (not Huffman encoded or prefix compressed), can be @@ -567,7 +571,7 @@ __inmem_row_leaf(WT_SESSION_IMPL *session, WT_PAGE *page) else __wt_row_leaf_key_set_cell(page, rip, unpack.cell); ++rip; - break; + continue; case WT_CELL_VALUE: /* * Simple values without compression can be directly referenced on the page to avoid @@ -584,72 +588,64 @@ __inmem_row_leaf(WT_SESSION_IMPL *session, WT_PAGE *page) case WT_CELL_VALUE_OVFL: break; default: - return (__wt_illegal_value(session, unpack.type)); + WT_ERR(__wt_illegal_value(session, unpack.type)); } - } - WT_CELL_FOREACH_END; - /* - * Instantiate prepared updates on leaf pages when the page is loaded. For in-memory databases, - * all non obsolete updates will retain on the page as part of __split_multi_inmem function. - */ - if (prepare && !F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) { - WT_RET(__wt_page_modify_init(session, page)); - if (!F_ISSET(btree, WT_BTREE_READONLY)) - __wt_page_modify_set(session, page); - - /* Allocate the per-page update array if one doesn't already exist. */ - if (page->entries != 0 && page->modify->mod_row_update == NULL) - WT_PAGE_ALLOC_AND_SWAP( - session, page, page->modify->mod_row_update, upd_array, page->entries); - - /* For each entry in the page */ - size = total_size = 0; - upd_array = page->modify->mod_row_update; - WT_ROW_FOREACH (page, rip, i) { - /* Unpack the on-page value cell. */ - __wt_row_leaf_value_cell(session, page, rip, NULL, &unpack); - if (unpack.tw.prepare) { - /* Take the value from the original page cell. */ - if (value == NULL) - WT_ERR(__wt_scr_alloc(session, 0, &value)); - WT_ERR(__wt_page_cell_data_ref(session, page, &unpack, value)); - - WT_ERR(__wt_upd_alloc(session, value, WT_UPDATE_STANDARD, &upd, &size)); - total_size += size; - upd->durable_ts = unpack.tw.durable_start_ts; - upd->start_ts = unpack.tw.start_ts; - upd->txnid = unpack.tw.start_txn; - - /* - * Instantiating both update and tombstone if the prepared update is a tombstone. - * This is required to ensure that written prepared delete operation must be removed - * from the data store, when the prepared transaction gets rollback. - */ - if (WT_TIME_WINDOW_HAS_STOP(&unpack.tw)) { - WT_ERR(__wt_upd_alloc_tombstone(session, &tombstone, &size)); - total_size += size; - tombstone->durable_ts = WT_TS_NONE; - tombstone->start_ts = unpack.tw.stop_ts; - tombstone->txnid = unpack.tw.stop_txn; - tombstone->prepare_state = WT_PREPARE_INPROGRESS; - F_SET(tombstone, WT_UPDATE_PREPARE_RESTORED_FROM_DS); - F_SET(upd, WT_UPDATE_RESTORED_FROM_DS); - tombstone->next = upd; - } else { - upd->durable_ts = WT_TS_NONE; - upd->prepare_state = WT_PREPARE_INPROGRESS; - F_SET(upd, WT_UPDATE_PREPARE_RESTORED_FROM_DS); - tombstone = upd; - } - - upd_array[WT_ROW_SLOT(page, rip)] = tombstone; - tombstone = upd = NULL; - } + if (!unpack.tw.prepare || prepare == PREPARE_IGNORE) + continue; + + /* First prepared transaction setup. */ + if (prepare == PREPARE_INSTANTIATE) { + WT_ERR(__wt_page_modify_init(session, page)); + if (!F_ISSET(btree, WT_BTREE_READONLY)) + __wt_page_modify_set(session, page); + + /* Allocate the per-page update array. */ + WT_ERR(__wt_calloc_def(session, page->entries, &page->modify->mod_row_update)); + total_size += page->entries * sizeof(*page->modify->mod_row_update); + + WT_ERR(__wt_scr_alloc(session, 0, &value)); + + prepare = PREPARE_INITIALIZED; + } + + /* Take the value from the page cell. */ + WT_ERR(__wt_page_cell_data_ref(session, page, &unpack, value)); + + WT_ERR(__wt_upd_alloc(session, value, WT_UPDATE_STANDARD, &upd, &size)); + total_size += size; + upd->durable_ts = unpack.tw.durable_start_ts; + upd->start_ts = unpack.tw.start_ts; + upd->txnid = unpack.tw.start_txn; + + /* + * Instantiate both update and tombstone if the prepared update is a tombstone. This is + * required to ensure that written prepared delete operation must be removed from the data + * store, when the prepared transaction gets rollback. + */ + if (WT_TIME_WINDOW_HAS_STOP(&unpack.tw)) { + WT_ERR(__wt_upd_alloc_tombstone(session, &tombstone, &size)); + total_size += size; + tombstone->durable_ts = WT_TS_NONE; + tombstone->start_ts = unpack.tw.stop_ts; + tombstone->txnid = unpack.tw.stop_txn; + tombstone->prepare_state = WT_PREPARE_INPROGRESS; + F_SET(tombstone, WT_UPDATE_PREPARE_RESTORED_FROM_DS); + F_SET(upd, WT_UPDATE_RESTORED_FROM_DS); + tombstone->next = upd; + } else { + upd->durable_ts = WT_TS_NONE; + upd->prepare_state = WT_PREPARE_INPROGRESS; + F_SET(upd, WT_UPDATE_PREPARE_RESTORED_FROM_DS); + tombstone = upd; } - __wt_cache_page_inmem_incr(session, page, total_size); + page->modify->mod_row_update[WT_ROW_SLOT(page, rip - 1)] = tombstone; + tombstone = upd = NULL; } + WT_CELL_FOREACH_END; + + __wt_cache_page_inmem_incr(session, page, total_size); err: __wt_free(session, tombstone); diff --git a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c index 2c97ddf48c7..764afc23f00 100644 --- a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c +++ b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c @@ -421,7 +421,12 @@ __rollback_abort_row_ondisk_kv( __wt_row_leaf_value_cell(session, page, rip, NULL, vpack); prepared = vpack->tw.prepare; if (WT_IS_HS(S2BT(session))) { - if (vpack->tw.durable_stop_ts > rollback_timestamp) { + /* + * Abort the history store update with stop durable timestamp greater than the stable + * timestamp or the updates with max stop timestamp which implies that they are associated + * with prepared transactions. + */ + if (vpack->tw.durable_stop_ts > rollback_timestamp || vpack->tw.stop_ts == WT_TS_MAX) { __wt_verbose(session, WT_VERB_RTS, "hs update aborted with start durable/commit timestamp: %s, %s, " "stop durable/commit timestamp: %s, %s and stable timestamp: %s", @@ -696,6 +701,24 @@ __rollback_abort_newer_row_leaf( } /* + * __rollback_get_ref_max_durable_timestamp -- + * Returns the ref aggregated max durable timestamp. The max durable timestamp is calculated + * between both start and stop durable timestamps except for history store, because most of the + * history store updates have stop timestamp either greater or equal to the start timestamp + * except for the updates written for the prepared updates on the data store. To abort the + * updates with no stop timestamp, we must include the newest stop timestamp also into the + * calculation of maximum durable timestamp of the history store. + */ +static wt_timestamp_t +__rollback_get_ref_max_durable_timestamp(WT_SESSION_IMPL *session, WT_TIME_AGGREGATE *ta) +{ + if (WT_IS_HS(S2BT(session))) + return WT_MAX(ta->newest_stop_durable_ts, ta->newest_stop_ts); + else + return WT_MAX(ta->newest_start_durable_ts, ta->newest_stop_durable_ts); +} + +/* * __rollback_page_needs_abort -- * Check whether the page needs rollback. Return true if the page has modifications newer than * the given timestamp Otherwise return false. @@ -730,16 +753,15 @@ __rollback_page_needs_abort( */ if (mod != NULL && mod->rec_result == WT_PM_REC_REPLACE) { tag = "reconciled replace block"; - durable_ts = WT_MAX( - mod->mod_replace.ta.newest_start_durable_ts, mod->mod_replace.ta.newest_stop_durable_ts); + durable_ts = __rollback_get_ref_max_durable_timestamp(session, &mod->mod_replace.ta); prepared = mod->mod_replace.ta.prepare; result = (durable_ts > rollback_timestamp) || prepared; } else if (mod != NULL && mod->rec_result == WT_PM_REC_MULTIBLOCK) { tag = "reconciled multi block"; /* Calculate the max durable timestamp by traversing all multi addresses. */ for (multi = mod->mod_multi, i = 0; i < mod->mod_multi_entries; ++multi, ++i) { - durable_ts = WT_MAX(durable_ts, multi->addr.ta.newest_start_durable_ts); - durable_ts = WT_MAX(durable_ts, multi->addr.ta.newest_stop_durable_ts); + durable_ts = WT_MAX( + durable_ts, __rollback_get_ref_max_durable_timestamp(session, &multi->addr.ta)); if (multi->addr.ta.prepare) prepared = true; } @@ -748,12 +770,12 @@ __rollback_page_needs_abort( tag = "on page cell"; /* Check if the page is obsolete using the page disk address. */ __wt_cell_unpack_addr(session, ref->home->dsk, (WT_CELL *)addr, &vpack); - durable_ts = WT_MAX(vpack.ta.newest_start_durable_ts, vpack.ta.newest_stop_durable_ts); + durable_ts = __rollback_get_ref_max_durable_timestamp(session, &vpack.ta); prepared = vpack.ta.prepare; result = (durable_ts > rollback_timestamp) || prepared; } else if (addr != NULL) { tag = "address"; - durable_ts = WT_MAX(addr->ta.newest_start_durable_ts, addr->ta.newest_stop_durable_ts); + durable_ts = __rollback_get_ref_max_durable_timestamp(session, &addr->ta); prepared = addr->ta.prepare; result = (durable_ts > rollback_timestamp) || prepared; } @@ -1064,7 +1086,7 @@ __rollback_to_stable_hs_final_pass(WT_SESSION_IMPL *session, wt_timestamp_t roll WT_CONFIG ckptconf; WT_CONFIG_ITEM cval, durableval, key; WT_DECL_RET; - wt_timestamp_t max_durable_ts, newest_start_durable_ts, newest_stop_durable_ts; + wt_timestamp_t max_durable_ts, newest_stop_durable_ts, newest_stop_ts; char *config; char ts_string[2][WT_TS_INT_STRING_SIZE]; @@ -1072,22 +1094,27 @@ __rollback_to_stable_hs_final_pass(WT_SESSION_IMPL *session, wt_timestamp_t roll WT_RET(__wt_metadata_search(session, WT_HS_URI, &config)); - /* Find out the max durable timestamp of the object from checkpoint. */ - newest_start_durable_ts = newest_stop_durable_ts = WT_TS_NONE; + /* + * Find out the max durable timestamp of the history store from checkpoint. Most of the history + * store updates have stop timestamp either greater or equal to the start timestamp except for + * the updates written for the prepared updates on the data store. To abort the updates with no + * stop timestamp, we must include the newest stop timestamp also into the calculation of + * maximum timestamp of the history store. + */ + newest_stop_durable_ts = newest_stop_ts = WT_TS_NONE; WT_ERR(__wt_config_getones(session, config, "checkpoint", &cval)); __wt_config_subinit(session, &ckptconf, &cval); for (; __wt_config_next(&ckptconf, &key, &cval) == 0;) { - ret = __wt_config_subgets(session, &cval, "newest_start_durable_ts", &durableval); - if (ret == 0) - newest_start_durable_ts = - WT_MAX(newest_start_durable_ts, (wt_timestamp_t)durableval.val); - WT_ERR_NOTFOUND_OK(ret, false); ret = __wt_config_subgets(session, &cval, "newest_stop_durable_ts", &durableval); if (ret == 0) newest_stop_durable_ts = WT_MAX(newest_stop_durable_ts, (wt_timestamp_t)durableval.val); WT_ERR_NOTFOUND_OK(ret, false); + ret = __wt_config_subgets(session, &cval, "newest_stop_ts", &durableval); + if (ret == 0) + newest_stop_ts = WT_MAX(newest_stop_ts, (wt_timestamp_t)durableval.val); + WT_ERR_NOTFOUND_OK(ret, false); } - max_durable_ts = WT_MAX(newest_start_durable_ts, newest_stop_durable_ts); + max_durable_ts = WT_MAX(newest_stop_ts, newest_stop_durable_ts); WT_ERR(__wt_session_get_dhandle(session, WT_HS_URI, NULL, NULL, 0)); /* diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py index e79c55203bf..567e51c8a65 100755 --- a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py +++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable10.py @@ -155,6 +155,135 @@ class test_rollback_to_stable10(test_rollback_to_stable_base): self.check(value_d, uri_1, nrows, 20) # Check that the correct data is seen at and after the stable timestamp. + self.check(value_c, uri_2, nrows, 30) + self.check(value_a, uri_2, nrows, 50) + self.check(value_a, uri_2, nrows, 80) + self.check(value_b, uri_2, nrows, 40) + self.check(value_d, uri_2, nrows, 20) + + stat_cursor = self.session.open_cursor('statistics:', None, None) + calls = stat_cursor[stat.conn.txn_rts][2] + hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2] + hs_sweep = stat_cursor[stat.conn.txn_rts_sweep_hs_keys][2] + keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2] + keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2] + pages_visited = stat_cursor[stat.conn.txn_rts_pages_visited][2] + upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2] + stat_cursor.close() + + self.assertEqual(calls, 0) + self.assertEqual(keys_removed, 0) + self.assertEqual(keys_restored, 0) + self.assertGreaterEqual(upd_aborted, 0) + self.assertGreater(pages_visited, 0) + self.assertGreaterEqual(hs_removed, 0) + self.assertGreater(hs_sweep, 0) + + def test_rollback_to_stable_prepare(self): + nrows = 1000 + + # Create a table without logging. + uri_1 = "table:rollback_to_stable10_1" + ds_1 = SimpleDataSet( + self, uri_1, 0, key_format="i", value_format="S", config='log=(enabled=false)') + ds_1.populate() + + # Create another table without logging. + uri_2 = "table:rollback_to_stable10_2" + ds_2 = SimpleDataSet( + self, uri_2, 0, key_format="i", value_format="S", config='log=(enabled=false)') + ds_2.populate() + + # Pin oldest and stable to timestamp 10. + self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(10) + + ',stable_timestamp=' + timestamp_str(10)) + + value_a = "aaaaa" * 100 + value_b = "bbbbb" * 100 + value_c = "ccccc" * 100 + value_d = "ddddd" * 100 + value_e = "eeeee" * 100 + value_f = "fffff" * 100 + + # Perform several updates. + self.large_updates(uri_1, value_d, ds_1, nrows, 20) + self.large_updates(uri_1, value_c, ds_1, nrows, 30) + self.large_updates(uri_1, value_b, ds_1, nrows, 40) + self.large_updates(uri_1, value_a, ds_1, nrows, 50) + + self.large_updates(uri_2, value_d, ds_2, nrows, 20) + self.large_updates(uri_2, value_c, ds_2, nrows, 30) + self.large_updates(uri_2, value_b, ds_2, nrows, 40) + self.large_updates(uri_2, value_a, ds_2, nrows, 50) + + # Verify data is visible and correct. + self.check(value_d, uri_1, nrows, 20) + self.check(value_c, uri_1, nrows, 30) + self.session.breakpoint() + self.check(value_b, uri_1, nrows, 40) + self.check(value_a, uri_1, nrows, 50) + + self.check(value_d, uri_2, nrows, 20) + self.check(value_c, uri_2, nrows, 30) + self.session.breakpoint() + self.check(value_b, uri_2, nrows, 40) + self.check(value_a, uri_2, nrows, 50) + + # Pin stable to timestamp 60 if prepare otherwise 50. + if self.prepare: + self.conn.set_timestamp('stable_timestamp=' + timestamp_str(60)) + else: + self.conn.set_timestamp('stable_timestamp=' + timestamp_str(50)) + + # Create a checkpoint thread + done = threading.Event() + ckpt = checkpoint_thread(self.conn, done) + ckpt.start() + + # Perform several updates in parallel with checkpoint. + session_p1 = self.conn.open_session() + cursor_p1 = session_p1.open_cursor(uri_1) + session_p1.begin_transaction('isolation=snapshot') + for i in range(1, nrows): + cursor_p1.set_key(ds_1.key(i)) + cursor_p1.set_value(value_e) + self.assertEquals(cursor_p1.update(), 0) + session_p1.prepare_transaction('prepare_timestamp=' + timestamp_str(69)) + + # Perform several updates in parallel with checkpoint. + session_p2 = self.conn.open_session() + cursor_p2 = session_p2.open_cursor(uri_2) + session_p2.begin_transaction('isolation=snapshot') + for i in range(1, nrows): + cursor_p2.set_key(ds_2.key(i)) + cursor_p2.set_value(value_e) + self.assertEquals(cursor_p2.update(), 0) + session_p2.prepare_transaction('prepare_timestamp=' + timestamp_str(69)) + + done.set() + ckpt.join() + + # Simulate a crash by copying to a new directory(RESTART). + copy_wiredtiger_home(".", "RESTART") + + # Commit the prepared transaction. + session_p1.commit_transaction('commit_timestamp=' + timestamp_str(70) + ',durable_timestamp=' + timestamp_str(71)) + session_p2.commit_transaction('commit_timestamp=' + timestamp_str(70) + ',durable_timestamp=' + timestamp_str(71)) + session_p1.close() + session_p2.close() + + # Open the new directory. + self.conn = self.setUpConnectionOpen("RESTART") + self.session = self.setUpSessionOpen(self.conn) + + # Check that the correct data is seen at and after the stable timestamp. + self.check(value_a, uri_1, nrows, 50) + self.check(value_a, uri_1, nrows, 80) + self.check(value_b, uri_1, nrows, 40) + self.check(value_c, uri_1, nrows, 30) + self.check(value_d, uri_1, nrows, 20) + + # Check that the correct data is seen at and after the stable timestamp. self.check(value_a, uri_2, nrows, 50) self.check(value_a, uri_2, nrows, 80) self.check(value_b, uri_2, nrows, 40) |