diff options
author | Luke Chen <luke.chen@mongodb.com> | 2021-05-03 14:14:00 +1000 |
---|---|---|
committer | Luke Chen <luke.chen@mongodb.com> | 2021-05-03 14:14:00 +1000 |
commit | 461629be57503f8dd30e3845dcdab7d6ba4e2abf (patch) | |
tree | b5f1ea1004c7c9ed92cb749d11933bca58f19505 /src/third_party/wiredtiger | |
parent | a83b7f8120c16b346e45a47a39cdf2543f94838a (diff) | |
download | mongo-461629be57503f8dd30e3845dcdab7d6ba4e2abf.tar.gz |
Import wiredtiger: 3454ee4a6238c22bfc2f16cc2e8722e3a49cfc72 from branch mongodb-4.4
ref: 38e948b9eb..3454ee4a62
for: 4.4.6
WT-7460 RTS to abort all the updates from a prepared transaction
Diffstat (limited to 'src/third_party/wiredtiger')
3 files changed, 256 insertions, 25 deletions
diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index d952484a5f3..e20a7d71218 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-4.4", - "commit": "38e948b9eb4ab532372ea62d0bc18b927007b079" + "commit": "3454ee4a6238c22bfc2f16cc2e8722e3a49cfc72" } diff --git a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c index 239eeeb9c01..f629c96c357 100644 --- a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c +++ b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c @@ -642,32 +642,55 @@ __rollback_abort_ondisk_kv(WT_SESSION_IMPL *session, WT_REF *ref, WT_COL *cip, W */ WT_RET(__wt_page_cell_data_ref(session, page, vpack, &buf)); - WT_ERR(__wt_upd_alloc(session, &buf, WT_UPDATE_STANDARD, &upd, NULL)); /* - * Set the transaction id of updates to WT_TXN_NONE when called from recovery, because the - * connections write generation will be initialized after rollback to stable and the updates - * in the cache will be problematic. The transaction id of pages which are in disk will be - * automatically reset as part of unpacking cell when loaded to cache. + * For prepared transactions, it is possible that both the on-disk key start and stop time + * windows can be the same. To abort these updates, check for any stable update from history + * store or remove the key. */ - if (F_ISSET(S2C(session), WT_CONN_RECOVERING)) - upd->txnid = WT_TXN_NONE; - else - upd->txnid = vpack->tw.start_txn; - upd->durable_ts = vpack->tw.durable_start_ts; - upd->start_ts = vpack->tw.start_ts; - F_SET(upd, WT_UPDATE_RESTORED_FROM_DS); - WT_STAT_CONN_DATA_INCR(session, txn_rts_keys_restored); - __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), - "key restored with commit timestamp: %s, durable timestamp: %s, stable timestamp: %s, " - "txnid: %" PRIu64 - " and removed commit timestamp: %s, durable timestamp: %s, txnid: %" PRIu64 - ", prepared: %s", - __wt_timestamp_to_string(upd->start_ts, ts_string[0]), - __wt_timestamp_to_string(upd->durable_ts, ts_string[1]), - __wt_timestamp_to_string(rollback_timestamp, ts_string[2]), upd->txnid, - __wt_timestamp_to_string(vpack->tw.stop_ts, ts_string[3]), - __wt_timestamp_to_string(vpack->tw.durable_stop_ts, ts_string[4]), vpack->tw.stop_txn, - prepared ? "true" : "false"); + if (vpack->tw.start_ts == vpack->tw.stop_ts && + vpack->tw.durable_start_ts == vpack->tw.durable_stop_ts && + vpack->tw.start_txn == vpack->tw.stop_txn) { + WT_ASSERT(session, prepared == true); + if (!F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) + return (__rollback_ondisk_fixup_key( + session, ref, NULL, cip, rip, rollback_timestamp, true, recno)); + else { + /* + * In-memory database don't have a history store to provide a stable update, so + * remove the key. + */ + WT_RET(__wt_upd_alloc_tombstone(session, &upd, NULL)); + WT_STAT_CONN_DATA_INCR(session, txn_rts_keys_removed); + } + } else { + WT_ERR(__wt_upd_alloc(session, &buf, WT_UPDATE_STANDARD, &upd, NULL)); + /* + * Set the transaction id of updates to WT_TXN_NONE when called from recovery, because + * the connections write generation will be initialized after rollback to stable and the + * updates in the cache will be problematic. The transaction id of pages which are in + * disk will be automatically reset as part of unpacking cell when loaded to cache. + */ + if (F_ISSET(S2C(session), WT_CONN_RECOVERING)) + upd->txnid = WT_TXN_NONE; + else + upd->txnid = vpack->tw.start_txn; + upd->durable_ts = vpack->tw.durable_start_ts; + upd->start_ts = vpack->tw.start_ts; + F_SET(upd, WT_UPDATE_RESTORED_FROM_DS); + WT_STAT_CONN_DATA_INCR(session, txn_rts_keys_restored); + __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), + "key restored with commit timestamp: %s, durable timestamp: %s, stable timestamp: " + "%s, " + "txnid: %" PRIu64 + " and removed commit timestamp: %s, durable timestamp: %s, txnid: %" PRIu64 + ", prepared: %s", + __wt_timestamp_to_string(upd->start_ts, ts_string[0]), + __wt_timestamp_to_string(upd->durable_ts, ts_string[1]), + __wt_timestamp_to_string(rollback_timestamp, ts_string[2]), upd->txnid, + __wt_timestamp_to_string(vpack->tw.stop_ts, ts_string[3]), + __wt_timestamp_to_string(vpack->tw.durable_stop_ts, ts_string[4]), vpack->tw.stop_txn, + prepared ? "true" : "false"); + } } else /* Stable version according to the timestamp. */ return (0); diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable19.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable19.py new file mode 100644 index 00000000000..3a24113fa32 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable19.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import fnmatch, os, shutil, time +from helper import simulate_crash_restart +from test_rollback_to_stable01 import test_rollback_to_stable_base +from wiredtiger import stat +from wtdataset import SimpleDataSet +from wtscenario import make_scenarios + +def timestamp_str(t): + return '%x' % t + +# test_rollback_to_stable19.py +# Test that rollback to stable aborts both insert and remove updates from a single prepared transaction +class test_rollback_to_stable19(test_rollback_to_stable_base): + session_config = 'isolation=snapshot' + + in_memory_values = [ + ('no_inmem', dict(in_memory=False)), + ('inmem', dict(in_memory=True)) + ] + + key_format_values = [ + ('column', dict(key_format='r')), + ('integer_row', dict(key_format='i')), + ] + + restart_options = [ + ('shutdown', dict(crash='false')), + ('crash', dict(crash='true')), + ] + + scenarios = make_scenarios(in_memory_values, key_format_values, restart_options) + + def conn_config(self): + config = 'cache_size=50MB,statistics=(all),log=(enabled=false),eviction_dirty_trigger=5,eviction_updates_trigger=5' + if self.in_memory: + config += ',in_memory=true' + else: + config += ',in_memory=false' + return config + + def test_rollback_to_stable_no_history(self): + nrows = 1000 + + # Prepare transactions for column store table is not yet supported. + if self.key_format == 'r': + self.skipTest('Prepare transactions for column store table is not yet supported') + + # Create a table without logging. + uri = "table:rollback_to_stable19" + ds = SimpleDataSet( + self, uri, 0, key_format=self.key_format, value_format="S", config='log=(enabled=false)') + ds.populate() + + # Pin oldest and stable timestamps to 10. + self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(10) + + ',stable_timestamp=' + timestamp_str(10)) + + valuea = "aaaaa" * 100 + + # Perform several updates and removes. + s = self.conn.open_session() + cursor = s.open_cursor(uri) + s.begin_transaction() + for i in range(1, nrows + 1): + cursor[ds.key(i)] = valuea + cursor.set_key(i) + cursor.remove() + cursor.close() + s.prepare_transaction('prepare_timestamp=' + timestamp_str(20)) + + # Configure debug behavior on a cursor to evict the page positioned on when the reset API is used. + evict_cursor = self.session.open_cursor(uri, None, "debug=(release_evict)") + + # Search for the key so we position our cursor on the page that we want to evict. + self.session.begin_transaction("ignore_prepare = true") + evict_cursor.set_key(1) + evict_cursor.search() + evict_cursor.reset() + evict_cursor.close() + self.session.commit_transaction() + + # Pin stable timestamp to 20. + self.conn.set_timestamp('stable_timestamp=' + timestamp_str(20)) + if not self.in_memory: + self.session.checkpoint() + + if not self.in_memory: + if self.crash: + simulate_crash_restart(self, ".", "RESTART") + else: + # Close and reopen the connection + self.reopen_conn() + else: + self.conn.rollback_to_stable() + s.rollback_transaction() + + # Verify data is not visible. + self.check(valuea, uri, 0, 20) + self.check(valuea, uri, 0, 30) + + stat_cursor = self.session.open_cursor('statistics:', None, None) + upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2] + keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2] + self.assertGreater(upd_aborted, 0) + self.assertGreater(keys_removed, 0) + + def test_rollback_to_stable_with_history(self): + nrows = 1000 + + # Prepare transactions for column store table is not yet supported. + if self.key_format == 'r': + self.skipTest('Prepare transactions for column store table is not yet supported') + + # Create a table without logging. + uri = "table:rollback_to_stable19" + ds = SimpleDataSet( + self, uri, 0, key_format=self.key_format, value_format="S", config='log=(enabled=false)') + ds.populate() + + # Pin oldest and stable timestamps to 10. + self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(10) + + ',stable_timestamp=' + timestamp_str(10)) + + valuea = "aaaaa" * 100 + valueb = "bbbbb" * 100 + + # Perform several updates. + self.large_updates(uri, valuea, ds, nrows, 0, 20) + + # Perform several removes. + self.large_removes(uri, ds, nrows, 0, 30) + + # Perform several updates and removes. + s = self.conn.open_session() + cursor = s.open_cursor(uri) + s.begin_transaction() + for i in range(1, nrows + 1): + cursor[ds.key(i)] = valueb + cursor.set_key(i) + cursor.remove() + cursor.close() + s.prepare_transaction('prepare_timestamp=' + timestamp_str(40)) + + # Configure debug behavior on a cursor to evict the page positioned on when the reset API is used. + evict_cursor = self.session.open_cursor(uri, None, "debug=(release_evict)") + + # Search for the key so we position our cursor on the page that we want to evict. + self.session.begin_transaction("ignore_prepare = true") + evict_cursor.set_key(1) + evict_cursor.search() + evict_cursor.reset() + evict_cursor.close() + self.session.commit_transaction() + + # Pin stable timestamp to 40. + self.conn.set_timestamp('stable_timestamp=' + timestamp_str(40)) + if not self.in_memory: + self.session.checkpoint() + + if not self.in_memory: + if self.crash: + simulate_crash_restart(self, ".", "RESTART") + else: + # Close and reopen the connection + self.reopen_conn() + else: + self.conn.rollback_to_stable() + s.rollback_transaction() + + # Verify data. + self.check(valuea, uri, nrows, 20) + self.check(valuea, uri, 0, 30) + self.check(valuea, uri, 0, 40) + + stat_cursor = self.session.open_cursor('statistics:', None, None) + upd_aborted = stat_cursor[stat.conn.txn_rts_upd_aborted][2] + hs_removed = stat_cursor[stat.conn.txn_rts_hs_removed][2] + self.assertGreater(upd_aborted, 0) + if not self.in_memory: + self.assertGreater(hs_removed, 0) |