summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Chen <luke.chen@mongodb.com>2022-09-05 13:32:57 +1000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-05 04:37:06 +0000
commit22450cd55f88f8d9e9b5f6b31553b7019f6ee892 (patch)
tree359f86221986630f87f7f10f9e9ac6f8af56cc1a
parent0eac00554b33039ec5ce7f62c7ec53ba6d2ba13b (diff)
downloadmongo-22450cd55f88f8d9e9b5f6b31553b7019f6ee892.tar.gz
Import wiredtiger: f45ebf82dba6e0d88edccb6c3047a717669b4562 from branch mongodb-6.1
ref: 2861a11653..f45ebf82db for: 6.1.0-rc1 WT-9763 Return EBUSY if the time window of inserting record does not match with history store time window (#8218)
-rw-r--r--src/third_party/wiredtiger/import.data2
-rw-r--r--src/third_party/wiredtiger/src/history/hs_rec.c53
-rw-r--r--src/third_party/wiredtiger/src/include/timestamp_inline.h5
-rw-r--r--src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py172
4 files changed, 227 insertions, 5 deletions
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data
index bc27075eab8..fbb2c69b28e 100644
--- a/src/third_party/wiredtiger/import.data
+++ b/src/third_party/wiredtiger/import.data
@@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger.git",
"branch": "mongodb-6.1",
- "commit": "2861a1165302d7a2ddf7b384922b2a022cc23897"
+ "commit": "f45ebf82dba6e0d88edccb6c3047a717669b4562"
}
diff --git a/src/third_party/wiredtiger/src/history/hs_rec.c b/src/third_party/wiredtiger/src/history/hs_rec.c
index 15afe37d31f..236f8856257 100644
--- a/src/third_party/wiredtiger/src/history/hs_rec.c
+++ b/src/third_party/wiredtiger/src/history/hs_rec.c
@@ -10,7 +10,7 @@
static int __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor,
uint32_t btree_id, const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool no_ts_tombstone,
- bool error_on_ts_ordering, uint64_t *hs_counter);
+ bool error_on_ts_ordering, uint64_t *hs_counter, WT_TIME_WINDOW *upd_tw);
/*
* __hs_verbose_cache_stats --
@@ -276,7 +276,7 @@ __hs_insert_record(WT_SESSION_IMPL *session, WT_CURSOR *cursor, WT_BTREE *btree,
if (ret == 0)
WT_ERR(__hs_delete_reinsert_from_pos(session, cursor, btree->id, key, tw->start_ts + 1,
- true, false, error_on_ts_ordering, &counter));
+ true, false, error_on_ts_ordering, &counter, tw));
#ifdef HAVE_DIAGNOSTIC
/*
@@ -855,7 +855,7 @@ __wt_hs_delete_key(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint32_t btre
}
WT_ERR(__hs_delete_reinsert_from_pos(session, hs_cursor, btree_id, key, WT_TS_NONE, reinsert,
- true, error_on_ts_ordering, &hs_counter));
+ true, error_on_ts_ordering, &hs_counter, NULL));
done:
err:
@@ -874,7 +874,7 @@ err:
static int
__hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint32_t btree_id,
const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool no_ts_tombstone,
- bool error_on_ts_ordering, uint64_t *counter)
+ bool error_on_ts_ordering, uint64_t *counter, WT_TIME_WINDOW *upd_tw)
{
WT_CURSOR *hs_insert_cursor;
WT_CURSOR_BTREE *hs_cbt;
@@ -912,6 +912,51 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui
if (__wt_txn_tw_stop_visible_all(session, twp))
continue;
+ /*
+ * The below example illustrates a case that the data store and the history
+ * store may contain the same value. In this case, skip inserting the same
+ * value to the history store again.
+ *
+ * Suppose there is one table table1 and the below operations are performed.
+ *
+ * 1. Insert a=1 in table1 at timestamp 10
+ * 2. Delete a from table1 at timestamp 20
+ * 3. Set stable timestamp = 20, oldest timestamp=1
+ * 4. Checkpoint table1
+ * 5. Insert a=2 in table1 at timestamp 30
+ * 6. Evict a=2 from table1 and move the content to history store.
+ * 7. Checkpoint is still running and before it finishes checkpointing the history store the
+ * above steps 5 and 6 will happen.
+ *
+ * After all this operations the checkpoint content will be
+ * Data store --
+ * table1 --> a=1 at start_ts=10, stop_ts=20
+ *
+ * History store --
+ * table1 --> a=1 at start_ts=10, stop_ts=20
+ *
+ * WiredTiger takes a backup of the checkpoint and use this backup to restore.
+ * Note: In table1 of both data store and history store has the same content.
+ *
+ * Now the backup is used to restore.
+ *
+ * 1. Insert a=3 in table1
+ * 2. Checkpoint started, eviction started and sees the same content in data store and
+ * history store while reconciling.
+ *
+ * The start timestamp and transaction ids are checked to ensure for the global
+ * visibility because globally visible timestamps and transaction ids may be cleared to 0.
+ * The time window of the inserting record and the history store record are
+ * compared to make sure that the same record are not being inserted again.
+ */
+
+ if (upd_tw != NULL &&
+ (__wt_txn_tw_start_visible_all(session, upd_tw) &&
+ __wt_txn_tw_start_visible_all(session, twp) ?
+ WT_TIME_WINDOWS_STOP_EQUAL(upd_tw, twp) :
+ WT_TIME_WINDOWS_EQUAL(upd_tw, twp)))
+ continue;
+
/* We shouldn't have crossed the btree and user key search space. */
WT_ERR(hs_cursor->get_key(hs_cursor, &hs_btree_id, &hs_key, &hs_ts, &hs_counter));
WT_ASSERT(session, hs_btree_id == btree_id);
diff --git a/src/third_party/wiredtiger/src/include/timestamp_inline.h b/src/third_party/wiredtiger/src/include/timestamp_inline.h
index 82c5716ac37..aeef44b6669 100644
--- a/src/third_party/wiredtiger/src/include/timestamp_inline.h
+++ b/src/third_party/wiredtiger/src/include/timestamp_inline.h
@@ -41,6 +41,11 @@
(tw1)->stop_ts == (tw2)->stop_ts && (tw1)->stop_txn == (tw2)->stop_txn && \
(tw1)->prepare == (tw2)->prepare)
+/* Return true if the stop time windows are the same. */
+#define WT_TIME_WINDOWS_STOP_EQUAL(tw1, tw2) \
+ ((tw1)->durable_stop_ts == (tw2)->durable_stop_ts && (tw1)->stop_ts == (tw2)->stop_ts && \
+ (tw1)->stop_txn == (tw2)->stop_txn && (tw1)->prepare == (tw2)->prepare)
+
/*
* Set the start values of a time window from those in an update structure. Durable timestamp can be
* 0 for prepared updates, in those cases use the prepared timestamp as durable timestamp.
diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py
new file mode 100644
index 00000000000..90bf63935da
--- /dev/null
+++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py
@@ -0,0 +1,172 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-present MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+import threading, time
+from helper import simulate_crash_restart
+from test_rollback_to_stable01 import test_rollback_to_stable_base
+from wiredtiger import stat
+from wtdataset import SimpleDataSet
+from wtscenario import make_scenarios
+from wtthread import checkpoint_thread
+
+# test_rollback_to_stable39.py
+# Test to delay checkpoint and perform eviction in parallel to ensure eviction moves the content from data store to history store
+# and then checkpoint history store to see the same content in data store and history store. Later use the checkpoint to restore
+# the database which will trigger eviction to insert the same record from data store to history store.
+class test_rollback_to_stable39(test_rollback_to_stable_base):
+ restart_config = False
+
+ format_values = [
+ ('column', dict(key_format='r', value_format='S', prepare_extraconfig='')),
+ ('column_fix', dict(key_format='r', value_format='8t',
+ prepare_extraconfig=',allocation_size=512,leaf_page_max=512')),
+ ('row_integer', dict(key_format='i', value_format='S', prepare_extraconfig='')),
+ ]
+
+ prepare_values = [
+ ('no_prepare', dict(prepare=False)),
+ ('prepare', dict(prepare=True))
+ ]
+
+ scenarios = make_scenarios(format_values, prepare_values)
+
+ def conn_config(self):
+ config = 'cache_size=25MB,statistics=(all),statistics_log=(json,on_close,wait=1)'
+ if self.restart_config:
+ config += ',timing_stress_for_test=[checkpoint_slow]'
+ else:
+ config += ',timing_stress_for_test=[history_store_checkpoint_delay]'
+ return config
+
+ def test_rollback_to_stable(self):
+ nrows = 1000
+
+ # Create a table.
+ uri = "table:rollback_to_stable39"
+ ds = SimpleDataSet(
+ self, uri, 0, key_format=self.key_format, value_format=self.value_format)
+ ds.populate()
+
+ if self.value_format == '8t':
+ value_a = 97
+ value_b = 98
+ value_c = 99
+ else:
+ value_a = "aaaaa" * 100
+ value_b = "bbbbb" * 100
+ value_c = "ccccc" * 100
+
+ # Pin oldest and stable to timestamp 10.
+ self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(10) +
+ ',stable_timestamp=' + self.timestamp_str(10))
+
+ # Perform several updates.
+ self.large_updates(uri, value_a, ds, nrows, self.prepare, 20)
+ # Verify data is visible and correct.
+ self.check(value_a, uri, nrows, None, 21 if self.prepare else 20)
+
+ self.large_removes(uri, ds, nrows, self.prepare, 30)
+ # Verify no data is visible.
+ self.check(value_a, uri, 0, nrows, 31 if self.prepare else 30)
+
+ # Pin stable to timestamp 40 if prepare otherwise 30.
+ self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(40 if self.prepare else 30))
+
+ # Create a checkpoint thread
+ done = threading.Event()
+ ckpt = checkpoint_thread(self.conn, done)
+ try:
+ ckpt.start()
+
+ # Wait for checkpoint to start before committing.
+ ckpt_started = 0
+ while not ckpt_started:
+ stat_cursor = self.session.open_cursor('statistics:', None, None)
+ ckpt_started = stat_cursor[stat.conn.txn_checkpoint_running][2]
+ stat_cursor.close()
+ time.sleep(1)
+
+ # Perform several updates in parallel with checkpoint.
+ # Rollbacks may occur when checkpoint is running, so retry as needed.
+ self.retry_rollback('update ds, e', None,
+ lambda: self.large_updates(uri, value_b, ds, nrows, self.prepare, 50))
+ self.evict_cursor(uri, nrows, value_b)
+ finally:
+ done.set()
+ ckpt.join()
+
+ # Simulate a crash by copying to a new directory(RESTART).
+ self.restart_config = True
+ simulate_crash_restart(self, ".", "RESTART")
+
+ # Check that the correct data is seen at and after the stable timestamp.
+ self.check(value_a, uri, nrows, None, 21 if self.prepare else 20)
+ self.check(value_a, uri, 0, nrows, 40)
+
+ stat_cursor = self.session.open_cursor('statistics:', None, None)
+ hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2]
+ hs_sweep = stat_cursor[stat.conn.txn_rts_sweep_hs_keys][2]
+ keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2]
+ keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2]
+ upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2]
+ stat_cursor.close()
+
+ self.assertEqual(keys_removed, 0)
+ self.assertEqual(keys_restored, 0)
+ self.assertEqual(upd_aborted, 0)
+ self.assertEqual(hs_removed, 0)
+ self.assertEqual(hs_sweep, 0)
+
+ # Perform several updates.
+ self.large_updates(uri, value_c, ds, nrows, self.prepare, 60)
+
+ # Verify data is visible and correct.
+ self.check(value_c, uri, nrows, None, 61 if self.prepare else 60)
+
+ # Create a checkpoint thread
+ done = threading.Event()
+ ckpt = checkpoint_thread(self.conn, done)
+ try:
+ ckpt.start()
+
+ # Wait for checkpoint to start before committing.
+ ckpt_started = 0
+ while not ckpt_started:
+ stat_cursor = self.session.open_cursor('statistics:', None, None)
+ ckpt_started = stat_cursor[stat.conn.txn_checkpoint_running][2]
+ stat_cursor.close()
+ time.sleep(1)
+
+ self.evict_cursor(uri, nrows, value_c)
+ finally:
+ done.set()
+ ckpt.join()
+
+
+if __name__ == '__main__':
+ wttest.run()
+