diff options
author | Luke Chen <luke.chen@mongodb.com> | 2022-09-05 13:32:57 +1000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-05 04:37:06 +0000 |
commit | 22450cd55f88f8d9e9b5f6b31553b7019f6ee892 (patch) | |
tree | 359f86221986630f87f7f10f9e9ac6f8af56cc1a | |
parent | 0eac00554b33039ec5ce7f62c7ec53ba6d2ba13b (diff) | |
download | mongo-22450cd55f88f8d9e9b5f6b31553b7019f6ee892.tar.gz |
Import wiredtiger: f45ebf82dba6e0d88edccb6c3047a717669b4562 from branch mongodb-6.1
ref: 2861a11653..f45ebf82db
for: 6.1.0-rc1
WT-9763 Return EBUSY if the time window of inserting record does not match with history store time window (#8218)
4 files changed, 227 insertions, 5 deletions
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index bc27075eab8..fbb2c69b28e 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-6.1", - "commit": "2861a1165302d7a2ddf7b384922b2a022cc23897" + "commit": "f45ebf82dba6e0d88edccb6c3047a717669b4562" } diff --git a/src/third_party/wiredtiger/src/history/hs_rec.c b/src/third_party/wiredtiger/src/history/hs_rec.c index 15afe37d31f..236f8856257 100644 --- a/src/third_party/wiredtiger/src/history/hs_rec.c +++ b/src/third_party/wiredtiger/src/history/hs_rec.c @@ -10,7 +10,7 @@ static int __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint32_t btree_id, const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool no_ts_tombstone, - bool error_on_ts_ordering, uint64_t *hs_counter); + bool error_on_ts_ordering, uint64_t *hs_counter, WT_TIME_WINDOW *upd_tw); /* * __hs_verbose_cache_stats -- @@ -276,7 +276,7 @@ __hs_insert_record(WT_SESSION_IMPL *session, WT_CURSOR *cursor, WT_BTREE *btree, if (ret == 0) WT_ERR(__hs_delete_reinsert_from_pos(session, cursor, btree->id, key, tw->start_ts + 1, - true, false, error_on_ts_ordering, &counter)); + true, false, error_on_ts_ordering, &counter, tw)); #ifdef HAVE_DIAGNOSTIC /* @@ -855,7 +855,7 @@ __wt_hs_delete_key(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint32_t btre } WT_ERR(__hs_delete_reinsert_from_pos(session, hs_cursor, btree_id, key, WT_TS_NONE, reinsert, - true, error_on_ts_ordering, &hs_counter)); + true, error_on_ts_ordering, &hs_counter, NULL)); done: err: @@ -874,7 +874,7 @@ err: static int __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint32_t btree_id, const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool no_ts_tombstone, - bool error_on_ts_ordering, uint64_t *counter) + bool error_on_ts_ordering, uint64_t *counter, WT_TIME_WINDOW *upd_tw) { WT_CURSOR *hs_insert_cursor; WT_CURSOR_BTREE *hs_cbt; @@ -912,6 +912,51 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui if (__wt_txn_tw_stop_visible_all(session, twp)) continue; + /* + * The below example illustrates a case that the data store and the history + * store may contain the same value. In this case, skip inserting the same + * value to the history store again. + * + * Suppose there is one table table1 and the below operations are performed. + * + * 1. Insert a=1 in table1 at timestamp 10 + * 2. Delete a from table1 at timestamp 20 + * 3. Set stable timestamp = 20, oldest timestamp=1 + * 4. Checkpoint table1 + * 5. Insert a=2 in table1 at timestamp 30 + * 6. Evict a=2 from table1 and move the content to history store. + * 7. Checkpoint is still running and before it finishes checkpointing the history store the + * above steps 5 and 6 will happen. + * + * After all this operations the checkpoint content will be + * Data store -- + * table1 --> a=1 at start_ts=10, stop_ts=20 + * + * History store -- + * table1 --> a=1 at start_ts=10, stop_ts=20 + * + * WiredTiger takes a backup of the checkpoint and use this backup to restore. + * Note: In table1 of both data store and history store has the same content. + * + * Now the backup is used to restore. + * + * 1. Insert a=3 in table1 + * 2. Checkpoint started, eviction started and sees the same content in data store and + * history store while reconciling. + * + * The start timestamp and transaction ids are checked to ensure for the global + * visibility because globally visible timestamps and transaction ids may be cleared to 0. + * The time window of the inserting record and the history store record are + * compared to make sure that the same record are not being inserted again. + */ + + if (upd_tw != NULL && + (__wt_txn_tw_start_visible_all(session, upd_tw) && + __wt_txn_tw_start_visible_all(session, twp) ? + WT_TIME_WINDOWS_STOP_EQUAL(upd_tw, twp) : + WT_TIME_WINDOWS_EQUAL(upd_tw, twp))) + continue; + /* We shouldn't have crossed the btree and user key search space. */ WT_ERR(hs_cursor->get_key(hs_cursor, &hs_btree_id, &hs_key, &hs_ts, &hs_counter)); WT_ASSERT(session, hs_btree_id == btree_id); diff --git a/src/third_party/wiredtiger/src/include/timestamp_inline.h b/src/third_party/wiredtiger/src/include/timestamp_inline.h index 82c5716ac37..aeef44b6669 100644 --- a/src/third_party/wiredtiger/src/include/timestamp_inline.h +++ b/src/third_party/wiredtiger/src/include/timestamp_inline.h @@ -41,6 +41,11 @@ (tw1)->stop_ts == (tw2)->stop_ts && (tw1)->stop_txn == (tw2)->stop_txn && \ (tw1)->prepare == (tw2)->prepare) +/* Return true if the stop time windows are the same. */ +#define WT_TIME_WINDOWS_STOP_EQUAL(tw1, tw2) \ + ((tw1)->durable_stop_ts == (tw2)->durable_stop_ts && (tw1)->stop_ts == (tw2)->stop_ts && \ + (tw1)->stop_txn == (tw2)->stop_txn && (tw1)->prepare == (tw2)->prepare) + /* * Set the start values of a time window from those in an update structure. Durable timestamp can be * 0 for prepared updates, in those cases use the prepared timestamp as durable timestamp. diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py new file mode 100644 index 00000000000..90bf63935da --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +import threading, time +from helper import simulate_crash_restart +from test_rollback_to_stable01 import test_rollback_to_stable_base +from wiredtiger import stat +from wtdataset import SimpleDataSet +from wtscenario import make_scenarios +from wtthread import checkpoint_thread + +# test_rollback_to_stable39.py +# Test to delay checkpoint and perform eviction in parallel to ensure eviction moves the content from data store to history store +# and then checkpoint history store to see the same content in data store and history store. Later use the checkpoint to restore +# the database which will trigger eviction to insert the same record from data store to history store. +class test_rollback_to_stable39(test_rollback_to_stable_base): + restart_config = False + + format_values = [ + ('column', dict(key_format='r', value_format='S', prepare_extraconfig='')), + ('column_fix', dict(key_format='r', value_format='8t', + prepare_extraconfig=',allocation_size=512,leaf_page_max=512')), + ('row_integer', dict(key_format='i', value_format='S', prepare_extraconfig='')), + ] + + prepare_values = [ + ('no_prepare', dict(prepare=False)), + ('prepare', dict(prepare=True)) + ] + + scenarios = make_scenarios(format_values, prepare_values) + + def conn_config(self): + config = 'cache_size=25MB,statistics=(all),statistics_log=(json,on_close,wait=1)' + if self.restart_config: + config += ',timing_stress_for_test=[checkpoint_slow]' + else: + config += ',timing_stress_for_test=[history_store_checkpoint_delay]' + return config + + def test_rollback_to_stable(self): + nrows = 1000 + + # Create a table. + uri = "table:rollback_to_stable39" + ds = SimpleDataSet( + self, uri, 0, key_format=self.key_format, value_format=self.value_format) + ds.populate() + + if self.value_format == '8t': + value_a = 97 + value_b = 98 + value_c = 99 + else: + value_a = "aaaaa" * 100 + value_b = "bbbbb" * 100 + value_c = "ccccc" * 100 + + # Pin oldest and stable to timestamp 10. + self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(10) + + ',stable_timestamp=' + self.timestamp_str(10)) + + # Perform several updates. + self.large_updates(uri, value_a, ds, nrows, self.prepare, 20) + # Verify data is visible and correct. + self.check(value_a, uri, nrows, None, 21 if self.prepare else 20) + + self.large_removes(uri, ds, nrows, self.prepare, 30) + # Verify no data is visible. + self.check(value_a, uri, 0, nrows, 31 if self.prepare else 30) + + # Pin stable to timestamp 40 if prepare otherwise 30. + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(40 if self.prepare else 30)) + + # Create a checkpoint thread + done = threading.Event() + ckpt = checkpoint_thread(self.conn, done) + try: + ckpt.start() + + # Wait for checkpoint to start before committing. + ckpt_started = 0 + while not ckpt_started: + stat_cursor = self.session.open_cursor('statistics:', None, None) + ckpt_started = stat_cursor[stat.conn.txn_checkpoint_running][2] + stat_cursor.close() + time.sleep(1) + + # Perform several updates in parallel with checkpoint. + # Rollbacks may occur when checkpoint is running, so retry as needed. + self.retry_rollback('update ds, e', None, + lambda: self.large_updates(uri, value_b, ds, nrows, self.prepare, 50)) + self.evict_cursor(uri, nrows, value_b) + finally: + done.set() + ckpt.join() + + # Simulate a crash by copying to a new directory(RESTART). + self.restart_config = True + simulate_crash_restart(self, ".", "RESTART") + + # Check that the correct data is seen at and after the stable timestamp. + self.check(value_a, uri, nrows, None, 21 if self.prepare else 20) + self.check(value_a, uri, 0, nrows, 40) + + stat_cursor = self.session.open_cursor('statistics:', None, None) + hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2] + hs_sweep = stat_cursor[stat.conn.txn_rts_sweep_hs_keys][2] + keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2] + keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2] + upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2] + stat_cursor.close() + + self.assertEqual(keys_removed, 0) + self.assertEqual(keys_restored, 0) + self.assertEqual(upd_aborted, 0) + self.assertEqual(hs_removed, 0) + self.assertEqual(hs_sweep, 0) + + # Perform several updates. + self.large_updates(uri, value_c, ds, nrows, self.prepare, 60) + + # Verify data is visible and correct. + self.check(value_c, uri, nrows, None, 61 if self.prepare else 60) + + # Create a checkpoint thread + done = threading.Event() + ckpt = checkpoint_thread(self.conn, done) + try: + ckpt.start() + + # Wait for checkpoint to start before committing. + ckpt_started = 0 + while not ckpt_started: + stat_cursor = self.session.open_cursor('statistics:', None, None) + ckpt_started = stat_cursor[stat.conn.txn_checkpoint_running][2] + stat_cursor.close() + time.sleep(1) + + self.evict_cursor(uri, nrows, value_c) + finally: + done.set() + ckpt.join() + + +if __name__ == '__main__': + wttest.run() + |