summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/third_party/wiredtiger/import.data2
-rw-r--r--src/third_party/wiredtiger/src/history/hs_rec.c68
-rw-r--r--src/third_party/wiredtiger/src/include/timestamp_inline.h5
-rw-r--r--src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py170
4 files changed, 233 insertions, 12 deletions
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data
index 3134840e789..9390334b313 100644
--- a/src/third_party/wiredtiger/import.data
+++ b/src/third_party/wiredtiger/import.data
@@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger.git",
"branch": "mongodb-4.4",
- "commit": "c2114ce2b1c40599892dc244e0fe1b5c35f9e296"
+ "commit": "c1099d90937b0beb39e78d31b5674e25f5dd04ca"
}
diff --git a/src/third_party/wiredtiger/src/history/hs_rec.c b/src/third_party/wiredtiger/src/history/hs_rec.c
index a98bbc7c7a2..f343e3bf879 100644
--- a/src/third_party/wiredtiger/src/history/hs_rec.c
+++ b/src/third_party/wiredtiger/src/history/hs_rec.c
@@ -9,8 +9,8 @@
#include "wt_internal.h"
static int __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor,
- uint32_t btree_id, const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool ooo_tombstone,
- bool error_on_ooo_ts, uint64_t *hs_counter);
+ uint32_t btree_id, const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool no_ts_tombstone,
+ bool error_on_ts_ordering, uint64_t *hs_counter, WT_TIME_WINDOW *upd_tw);
/*
* __hs_verbose_cache_stats --
@@ -209,7 +209,7 @@ __hs_insert_record(WT_SESSION_IMPL *session, WT_CURSOR *cursor, WT_BTREE *btree,
if (ret == 0)
WT_ERR(__hs_delete_reinsert_from_pos(session, cursor, btree->id, key, tw->start_ts + 1,
- true, false, error_on_ooo_ts, &counter));
+ true, false, error_on_ooo_ts, &counter, tw));
#ifdef HAVE_DIAGNOSTIC
/*
@@ -816,7 +816,7 @@ __wt_hs_delete_key_from_ts(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint3
}
WT_ERR(__hs_delete_reinsert_from_pos(session, hs_cursor, btree_id, key, ts, reinsert,
- ooo_tombstone, error_on_ooo_ts, &hs_counter));
+ ooo_tombstone, error_on_ooo_ts, &hs_counter, NULL));
done:
err:
@@ -834,8 +834,8 @@ err:
*/
static int
__hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, uint32_t btree_id,
- const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool ooo_tombstone, bool error_on_ooo_ts,
- uint64_t *counter)
+ const WT_ITEM *key, wt_timestamp_t ts, bool reinsert, bool no_ts_tombstone,
+ bool error_on_ts_ordering, uint64_t *counter, WT_TIME_WINDOW *upd_tw)
{
WT_CURSOR *hs_insert_cursor;
WT_CURSOR_BTREE *hs_cbt;
@@ -863,7 +863,7 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui
* If we delete all the updates of the key from the history store, we should not reinsert any
* update except when an out-of-order tombstone is not globally visible yet.
*/
- WT_ASSERT(session, ooo_tombstone || ts > WT_TS_NONE || !reinsert);
+ WT_ASSERT(session, no_ts_tombstone || ts > WT_TS_NONE || !reinsert);
for (; ret == 0; ret = hs_cursor->next(hs_cursor)) {
/* Ignore records that are obsolete. */
@@ -871,6 +871,51 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui
if (__wt_txn_tw_stop_visible_all(session, twp))
continue;
+ /*
+ * The below example illustrates a case that the data store and the history
+ * store may contain the same value. In this case, skip inserting the same
+ * value to the history store again.
+ *
+ * Suppose there is one table table1 and the below operations are performed.
+ *
+ * 1. Insert a=1 in table1 at timestamp 10
+ * 2. Delete a from table1 at timestamp 20
+ * 3. Set stable timestamp = 20, oldest timestamp=1
+ * 4. Checkpoint table1
+ * 5. Insert a=2 in table1 at timestamp 30
+ * 6. Evict a=2 from table1 and move the content to history store.
+ * 7. Checkpoint is still running and before it finishes checkpointing the history store the
+ * above steps 5 and 6 will happen.
+ *
+ * After all this operations the checkpoint content will be
+ * Data store --
+ * table1 --> a=1 at start_ts=10, stop_ts=20
+ *
+ * History store --
+ * table1 --> a=1 at start_ts=10, stop_ts=20
+ *
+ * WiredTiger takes a backup of the checkpoint and use this backup to restore.
+ * Note: In table1 of both data store and history store has the same content.
+ *
+ * Now the backup is used to restore.
+ *
+ * 1. Insert a=3 in table1
+ * 2. Checkpoint started, eviction started and sees the same content in data store and
+ * history store while reconciling.
+ *
+ * The start timestamp and transaction ids are checked to ensure for the global
+ * visibility because globally visible timestamps and transaction ids may be cleared to 0.
+ * The time window of the inserting record and the history store record are
+ * compared to make sure that the same record are not being inserted again.
+ */
+
+ if (upd_tw != NULL &&
+ (__wt_txn_tw_start_visible_all(session, upd_tw) &&
+ __wt_txn_tw_start_visible_all(session, twp) ?
+ WT_TIME_WINDOWS_STOP_EQUAL(upd_tw, twp) :
+ WT_TIME_WINDOWS_EQUAL(upd_tw, twp)))
+ continue;
+
/* We shouldn't have crossed the btree and user key search space. */
WT_ERR(hs_cursor->get_key(hs_cursor, &hs_btree_id, &hs_key, &hs_ts, &hs_counter));
WT_ASSERT(session, hs_btree_id == btree_id);
@@ -891,7 +936,7 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui
* flag. We cannot modify the history store to fix the out of order timestamp updates as it may
* make the history store checkpoint inconsistent.
*/
- if (error_on_ooo_ts) {
+ if (error_on_ts_ordering) {
ret = EBUSY;
WT_STAT_CONN_INCR(session, cache_eviction_fail_checkpoint_out_of_order_ts);
goto err;
@@ -979,7 +1024,8 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui
* to the new update.
*/
if (hs_cbt->upd_value->tw.start_ts >= ts)
- hs_insert_tw.start_ts = hs_insert_tw.durable_start_ts = ooo_tombstone ? ts : ts - 1;
+ hs_insert_tw.start_ts = hs_insert_tw.durable_start_ts =
+ no_ts_tombstone ? ts : ts - 1;
else {
hs_insert_tw.start_ts = hs_cbt->upd_value->tw.start_ts;
hs_insert_tw.durable_start_ts = hs_cbt->upd_value->tw.durable_start_ts;
@@ -991,7 +1037,7 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui
* another moved update OR the update itself triggered the correction. In either case,
* we should preserve the stop transaction id.
*/
- hs_insert_tw.stop_ts = hs_insert_tw.durable_stop_ts = ooo_tombstone ? ts : ts - 1;
+ hs_insert_tw.stop_ts = hs_insert_tw.durable_stop_ts = no_ts_tombstone ? ts : ts - 1;
hs_insert_tw.stop_txn = hs_cbt->upd_value->tw.stop_txn;
/* Extract the underlying value for reinsertion. */
@@ -999,7 +1045,7 @@ __hs_delete_reinsert_from_pos(WT_SESSION_IMPL *session, WT_CURSOR *hs_cursor, ui
hs_cursor, &tw.durable_stop_ts, &tw.durable_start_ts, &hs_upd_type, &hs_value));
/* Reinsert the update with corrected timestamps. */
- if (ooo_tombstone && hs_ts == ts)
+ if (no_ts_tombstone && hs_ts == ts)
*counter = hs_counter;
/* Insert the value back with different timestamps. */
diff --git a/src/third_party/wiredtiger/src/include/timestamp_inline.h b/src/third_party/wiredtiger/src/include/timestamp_inline.h
index fd27a1f8a90..a00b4325dc0 100644
--- a/src/third_party/wiredtiger/src/include/timestamp_inline.h
+++ b/src/third_party/wiredtiger/src/include/timestamp_inline.h
@@ -37,6 +37,11 @@
(tw1)->stop_ts == (tw2)->stop_ts && (tw1)->stop_txn == (tw2)->stop_txn && \
(tw1)->prepare == (tw2)->prepare)
+/* Return true if the stop time windows are the same. */
+#define WT_TIME_WINDOWS_STOP_EQUAL(tw1, tw2) \
+ ((tw1)->durable_stop_ts == (tw2)->durable_stop_ts && (tw1)->stop_ts == (tw2)->stop_ts && \
+ (tw1)->stop_txn == (tw2)->stop_txn && (tw1)->prepare == (tw2)->prepare)
+
/*
* Set the start values of a time window from those in an update structure. Durable timestamp can be
* 0 for prepared updates, in those cases use the prepared timestamp as durable timestamp.
diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py
new file mode 100644
index 00000000000..bf6225d7213
--- /dev/null
+++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable39.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-present MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+import threading, time
+from helper import simulate_crash_restart
+from test_rollback_to_stable01 import test_rollback_to_stable_base
+from wiredtiger import stat
+from wtdataset import SimpleDataSet
+from wtscenario import make_scenarios
+from wtthread import checkpoint_thread
+
+# test_rollback_to_stable39.py
+# Test to delay checkpoint and perform eviction in parallel to ensure eviction moves the content from data store to history store
+# and then checkpoint history store to see the same content in data store and history store. Later use the checkpoint to restore
+# the database which will trigger eviction to insert the same record from data store to history store.
+class test_rollback_to_stable39(test_rollback_to_stable_base):
+ restart_config = False
+
+ format_values = [
+ ('column', dict(key_format='r', value_format='S', prepare_extraconfig='')),
+ ('row_integer', dict(key_format='i', value_format='S', prepare_extraconfig='')),
+ ]
+
+ prepare_values = [
+ ('no_prepare', dict(prepare=False)),
+ ('prepare', dict(prepare=True))
+ ]
+
+ scenarios = make_scenarios(format_values, prepare_values)
+
+ def conn_config(self):
+ config = 'cache_size=25MB,statistics=(all),statistics_log=(json,on_close,wait=1)'
+ if self.restart_config:
+ config += ',timing_stress_for_test=[checkpoint_slow]'
+ else:
+ config += ',timing_stress_for_test=[history_store_checkpoint_delay]'
+ return config
+
+ def test_rollback_to_stable(self):
+ nrows = 1000
+
+ # Create a table.
+ uri = "table:rollback_to_stable39"
+ ds = SimpleDataSet(
+ self, uri, 0, key_format=self.key_format, value_format=self.value_format)
+ ds.populate()
+
+ if self.value_format == '8t':
+ value_a = 97
+ value_b = 98
+ value_c = 99
+ else:
+ value_a = "aaaaa" * 100
+ value_b = "bbbbb" * 100
+ value_c = "ccccc" * 100
+
+ # Pin oldest and stable to timestamp 10.
+ self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(10) +
+ ',stable_timestamp=' + self.timestamp_str(10))
+
+ # Perform several updates.
+ self.large_updates(uri, value_a, ds, nrows, self.prepare, 20)
+ # Verify data is visible and correct.
+ self.check(value_a, uri, nrows, 21 if self.prepare else 20)
+
+ self.large_removes(uri, ds, nrows, self.prepare, 30)
+ # Verify no data is visible.
+ self.check(value_a, uri, 0, 31 if self.prepare else 30)
+
+ # Pin stable to timestamp 40 if prepare otherwise 30.
+ self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(40 if self.prepare else 30))
+
+ # Create a checkpoint thread
+ done = threading.Event()
+ ckpt = checkpoint_thread(self.conn, done)
+ try:
+ ckpt.start()
+
+ # Wait for checkpoint to start before committing.
+ ckpt_started = 0
+ while not ckpt_started:
+ stat_cursor = self.session.open_cursor('statistics:', None, None)
+ ckpt_started = stat_cursor[stat.conn.txn_checkpoint_running][2]
+ stat_cursor.close()
+ time.sleep(1)
+
+ # Perform several updates in parallel with checkpoint.
+ # Rollbacks may occur when checkpoint is running, so retry as needed.
+ self.retry_rollback('update ds, e', None,
+ lambda: self.large_updates(uri, value_b, ds, nrows, self.prepare, 50))
+ self.evict_cursor(uri, nrows, value_b)
+ finally:
+ done.set()
+ ckpt.join()
+
+ # Simulate a crash by copying to a new directory(RESTART).
+ self.restart_config = True
+ simulate_crash_restart(self, ".", "RESTART")
+
+ # Check that the correct data is seen at and after the stable timestamp.
+ self.check(value_a, uri, nrows, 21 if self.prepare else 20)
+ self.check(value_a, uri, 0, 40)
+
+ stat_cursor = self.session.open_cursor('statistics:', None, None)
+ hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2]
+ hs_sweep = stat_cursor[stat.conn.txn_rts_sweep_hs_keys][2]
+ keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2]
+ keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2]
+ upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2]
+ stat_cursor.close()
+
+ self.assertEqual(keys_removed, 0)
+ self.assertEqual(keys_restored, 0)
+ self.assertEqual(upd_aborted, 0)
+ self.assertEqual(hs_removed, 0)
+ self.assertEqual(hs_sweep, 0)
+
+ # Perform several updates.
+ self.large_updates(uri, value_c, ds, nrows, self.prepare, 60)
+
+ # Verify data is visible and correct.
+ self.check(value_c, uri, nrows, 61 if self.prepare else 60)
+
+ # Create a checkpoint thread
+ done = threading.Event()
+ ckpt = checkpoint_thread(self.conn, done)
+ try:
+ ckpt.start()
+
+ # Wait for checkpoint to start before committing.
+ ckpt_started = 0
+ while not ckpt_started:
+ stat_cursor = self.session.open_cursor('statistics:', None, None)
+ ckpt_started = stat_cursor[stat.conn.txn_checkpoint_running][2]
+ stat_cursor.close()
+ time.sleep(1)
+
+ self.evict_cursor(uri, nrows, value_c)
+ finally:
+ done.set()
+ ckpt.join()
+
+
+if __name__ == '__main__':
+ wttest.run()
+