diff options
4 files changed, 233 insertions, 12 deletions
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 3134840e789..9390334b313 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-4.4", - "commit": "c2114ce2b1c40599892dc244e0fe1b5c35f9e296" + "commit": "c1099d90937b0beb39e78d31b5674e25f5dd04ca" } diff --git a/src/third_party/wiredtiger/src/history/hs_rec.c b/src/third_party/wiredtiger/src/history/hs_rec.c index a98bbc7c7a2..f343e3bf879 100644 --- a/src/third_party/wiredtiger/src/history/hs_rec.c +++ b/src/third_party/wiredtiger/src/history/hs_rec.c @@ -9,8 +9,8 @@ #include "wt_internal.h" static int __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, - uint32_t btree_id, const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool ooo_tombstone, - bool error_on_ooo_ts, uint64_t *hs_counter); + uint32_t btree_id, const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool no_ts_tombstone, + bool error_on_ts_ordering, uint64_t *hs_counter, WT_TIME_WINDOW *upd_tw); /* * __hs_verbose_cache_stats -- @@ -209,7 +209,7 @@ __hs_insert_record(WT_SESSION_IMPL *session, WT_CURSOR *cursor, WT_BTREE *btree, if (ret == 0) WT_ERR(__hs_delete_reinsert_from_pos(session, cursor, btree->id, key, tw->start_ts + 1, - true, false, error_on_ooo_ts, &counter)); + true, false, error_on_ooo_ts, &counter, tw)); #ifdef HAVE_DIAGNOSTIC /* @@ -816,7 +816,7 @@ __wt_hs_delete_key_from_ts(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint3 } WT_ERR(__hs_delete_reinsert_from_pos(session, hs_cursor, btree_id, key, ts, reinsert, - ooo_tombstone, error_on_ooo_ts, &hs_counter)); + ooo_tombstone, error_on_ooo_ts, &hs_counter, NULL)); done: err: @@ -834,8 +834,8 @@ err: */ static int __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint32_t btree_id, - const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool ooo_tombstone, bool error_on_ooo_ts, - uint64_t *counter) + const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool no_ts_tombstone, + bool error_on_ts_ordering, uint64_t *counter, WT_TIME_WINDOW *upd_tw) { WT_CURSOR *hs_insert_cursor; WT_CURSOR_BTREE *hs_cbt; @@ -863,7 +863,7 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui * If we delete all the updates of the key from the history store, we should not reinsert any * update except when an out-of-order tombstone is not globally visible yet. */ - WT_ASSERT(session, ooo_tombstone || ts > WT_TS_NONE || !reinsert); + WT_ASSERT(session, no_ts_tombstone || ts > WT_TS_NONE || !reinsert); for (; ret == 0; ret = hs_cursor->next(hs_cursor)) { /* Ignore records that are obsolete. */ @@ -871,6 +871,51 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui if (__wt_txn_tw_stop_visible_all(session, twp)) continue; + /* + * The below example illustrates a case that the data store and the history + * store may contain the same value. In this case, skip inserting the same + * value to the history store again. + * + * Suppose there is one table table1 and the below operations are performed. + * + * 1. Insert a=1 in table1 at timestamp 10 + * 2. Delete a from table1 at timestamp 20 + * 3. Set stable timestamp = 20, oldest timestamp=1 + * 4. Checkpoint table1 + * 5. Insert a=2 in table1 at timestamp 30 + * 6. Evict a=2 from table1 and move the content to history store. + * 7. Checkpoint is still running and before it finishes checkpointing the history store the + * above steps 5 and 6 will happen. + * + * After all this operations the checkpoint content will be + * Data store -- + * table1 --> a=1 at start_ts=10, stop_ts=20 + * + * History store -- + * table1 --> a=1 at start_ts=10, stop_ts=20 + * + * WiredTiger takes a backup of the checkpoint and use this backup to restore. + * Note: In table1 of both data store and history store has the same content. + * + * Now the backup is used to restore. + * + * 1. Insert a=3 in table1 + * 2. Checkpoint started, eviction started and sees the same content in data store and + * history store while reconciling. + * + * The start timestamp and transaction ids are checked to ensure for the global + * visibility because globally visible timestamps and transaction ids may be cleared to 0. + * The time window of the inserting record and the history store record are + * compared to make sure that the same record are not being inserted again. + */ + + if (upd_tw != NULL && + (__wt_txn_tw_start_visible_all(session, upd_tw) && + __wt_txn_tw_start_visible_all(session, twp) ? + WT_TIME_WINDOWS_STOP_EQUAL(upd_tw, twp) : + WT_TIME_WINDOWS_EQUAL(upd_tw, twp))) + continue; + /* We shouldn't have crossed the btree and user key search space. */ WT_ERR(hs_cursor->get_key(hs_cursor, &hs_btree_id, &hs_key, &hs_ts, &hs_counter)); WT_ASSERT(session, hs_btree_id == btree_id); @@ -891,7 +936,7 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui * flag. We cannot modify the history store to fix the out of order timestamp updates as it may * make the history store checkpoint inconsistent. */ - if (error_on_ooo_ts) { + if (error_on_ts_ordering) { ret = EBUSY; WT_STAT_CONN_INCR(session, cache_eviction_fail_checkpoint_out_of_order_ts); goto err; @@ -979,7 +1024,8 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui * to the new update. */ if (hs_cbt->upd_value->tw.start_ts >= ts) - hs_insert_tw.start_ts = hs_insert_tw.durable_start_ts = ooo_tombstone ? ts : ts - 1; + hs_insert_tw.start_ts = hs_insert_tw.durable_start_ts = + no_ts_tombstone ? ts : ts - 1; else { hs_insert_tw.start_ts = hs_cbt->upd_value->tw.start_ts; hs_insert_tw.durable_start_ts = hs_cbt->upd_value->tw.durable_start_ts; @@ -991,7 +1037,7 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui * another moved update OR the update itself triggered the correction. In either case, * we should preserve the stop transaction id. */ - hs_insert_tw.stop_ts = hs_insert_tw.durable_stop_ts = ooo_tombstone ? ts : ts - 1; + hs_insert_tw.stop_ts = hs_insert_tw.durable_stop_ts = no_ts_tombstone ? ts : ts - 1; hs_insert_tw.stop_txn = hs_cbt->upd_value->tw.stop_txn; /* Extract the underlying value for reinsertion. */ @@ -999,7 +1045,7 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui hs_cursor, &tw.durable_stop_ts, &tw.durable_start_ts, &hs_upd_type, &hs_value)); /* Reinsert the update with corrected timestamps. */ - if (ooo_tombstone && hs_ts == ts) + if (no_ts_tombstone && hs_ts == ts) *counter = hs_counter; /* Insert the value back with different timestamps. */ diff --git a/src/third_party/wiredtiger/src/include/timestamp_inline.h b/src/third_party/wiredtiger/src/include/timestamp_inline.h index fd27a1f8a90..a00b4325dc0 100644 --- a/src/third_party/wiredtiger/src/include/timestamp_inline.h +++ b/src/third_party/wiredtiger/src/include/timestamp_inline.h @@ -37,6 +37,11 @@ (tw1)->stop_ts == (tw2)->stop_ts && (tw1)->stop_txn == (tw2)->stop_txn && \ (tw1)->prepare == (tw2)->prepare) +/* Return true if the stop time windows are the same. */ +#define WT_TIME_WINDOWS_STOP_EQUAL(tw1, tw2) \ + ((tw1)->durable_stop_ts == (tw2)->durable_stop_ts && (tw1)->stop_ts == (tw2)->stop_ts && \ + (tw1)->stop_txn == (tw2)->stop_txn && (tw1)->prepare == (tw2)->prepare) + /* * Set the start values of a time window from those in an update structure. Durable timestamp can be * 0 for prepared updates, in those cases use the prepared timestamp as durable timestamp. diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py new file mode 100644 index 00000000000..bf6225d7213 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +import threading, time +from helper import simulate_crash_restart +from test_rollback_to_stable01 import test_rollback_to_stable_base +from wiredtiger import stat +from wtdataset import SimpleDataSet +from wtscenario import make_scenarios +from wtthread import checkpoint_thread + +# test_rollback_to_stable39.py +# Test to delay checkpoint and perform eviction in parallel to ensure eviction moves the content from data store to history store +# and then checkpoint history store to see the same content in data store and history store. Later use the checkpoint to restore +# the database which will trigger eviction to insert the same record from data store to history store. +class test_rollback_to_stable39(test_rollback_to_stable_base): + restart_config = False + + format_values = [ + ('column', dict(key_format='r', value_format='S', prepare_extraconfig='')), + ('row_integer', dict(key_format='i', value_format='S', prepare_extraconfig='')), + ] + + prepare_values = [ + ('no_prepare', dict(prepare=False)), + ('prepare', dict(prepare=True)) + ] + + scenarios = make_scenarios(format_values, prepare_values) + + def conn_config(self): + config = 'cache_size=25MB,statistics=(all),statistics_log=(json,on_close,wait=1)' + if self.restart_config: + config += ',timing_stress_for_test=[checkpoint_slow]' + else: + config += ',timing_stress_for_test=[history_store_checkpoint_delay]' + return config + + def test_rollback_to_stable(self): + nrows = 1000 + + # Create a table. + uri = "table:rollback_to_stable39" + ds = SimpleDataSet( + self, uri, 0, key_format=self.key_format, value_format=self.value_format) + ds.populate() + + if self.value_format == '8t': + value_a = 97 + value_b = 98 + value_c = 99 + else: + value_a = "aaaaa" * 100 + value_b = "bbbbb" * 100 + value_c = "ccccc" * 100 + + # Pin oldest and stable to timestamp 10. + self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(10) + + ',stable_timestamp=' + self.timestamp_str(10)) + + # Perform several updates. + self.large_updates(uri, value_a, ds, nrows, self.prepare, 20) + # Verify data is visible and correct. + self.check(value_a, uri, nrows, 21 if self.prepare else 20) + + self.large_removes(uri, ds, nrows, self.prepare, 30) + # Verify no data is visible. + self.check(value_a, uri, 0, 31 if self.prepare else 30) + + # Pin stable to timestamp 40 if prepare otherwise 30. + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(40 if self.prepare else 30)) + + # Create a checkpoint thread + done = threading.Event() + ckpt = checkpoint_thread(self.conn, done) + try: + ckpt.start() + + # Wait for checkpoint to start before committing. + ckpt_started = 0 + while not ckpt_started: + stat_cursor = self.session.open_cursor('statistics:', None, None) + ckpt_started = stat_cursor[stat.conn.txn_checkpoint_running][2] + stat_cursor.close() + time.sleep(1) + + # Perform several updates in parallel with checkpoint. + # Rollbacks may occur when checkpoint is running, so retry as needed. + self.retry_rollback('update ds, e', None, + lambda: self.large_updates(uri, value_b, ds, nrows, self.prepare, 50)) + self.evict_cursor(uri, nrows, value_b) + finally: + done.set() + ckpt.join() + + # Simulate a crash by copying to a new directory(RESTART). + self.restart_config = True + simulate_crash_restart(self, ".", "RESTART") + + # Check that the correct data is seen at and after the stable timestamp. + self.check(value_a, uri, nrows, 21 if self.prepare else 20) + self.check(value_a, uri, 0, 40) + + stat_cursor = self.session.open_cursor('statistics:', None, None) + hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2] + hs_sweep = stat_cursor[stat.conn.txn_rts_sweep_hs_keys][2] + keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2] + keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2] + upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2] + stat_cursor.close() + + self.assertEqual(keys_removed, 0) + self.assertEqual(keys_restored, 0) + self.assertEqual(upd_aborted, 0) + self.assertEqual(hs_removed, 0) + self.assertEqual(hs_sweep, 0) + + # Perform several updates. + self.large_updates(uri, value_c, ds, nrows, self.prepare, 60) + + # Verify data is visible and correct. + self.check(value_c, uri, nrows, 61 if self.prepare else 60) + + # Create a checkpoint thread + done = threading.Event() + ckpt = checkpoint_thread(self.conn, done) + try: + ckpt.start() + + # Wait for checkpoint to start before committing. + ckpt_started = 0 + while not ckpt_started: + stat_cursor = self.session.open_cursor('statistics:', None, None) + ckpt_started = stat_cursor[stat.conn.txn_checkpoint_running][2] + stat_cursor.close() + time.sleep(1) + + self.evict_cursor(uri, nrows, value_c) + finally: + done.set() + ckpt.join() + + +if __name__ == '__main__': + wttest.run() + |