From ec541cdf59f7dd96ee36082516ead214a23c7954 Mon Sep 17 00:00:00 2001 From: Andrew Morton Date: Tue, 16 May 2023 22:21:28 +0000 Subject: Import wiredtiger: 9318c8f5aed6dd7422f3c2a9c05a89ee40f804f8 from branch mongodb-master ref: 8dc1c0b0f1..9318c8f5ae for: 7.1.0-rc0 WT-10820 Checkpoint to mark the tree dirty when the tree has more data --- src/third_party/wiredtiger/import.data | 2 +- src/third_party/wiredtiger/src/btree/bt_sync.c | 16 ++ .../wiredtiger/src/reconcile/rec_visibility.c | 15 +- .../wiredtiger/src/reconcile/rec_write.c | 37 ++- .../wiredtiger/test/suite/test_alter05.py | 2 + .../test/suite/test_checkpoint_snapshot06.py | 254 +++++++++++++++++++++ src/third_party/wiredtiger/test/suite/test_hs21.py | 3 + 7 files changed, 299 insertions(+), 30 deletions(-) create mode 100644 src/third_party/wiredtiger/test/suite/test_checkpoint_snapshot06.py diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index d6619336b55..eda98191606 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-master", - "commit": "8dc1c0b0f1381773a0d13d5751ec724f23f77709" + "commit": "9318c8f5aed6dd7422f3c2a9c05a89ee40f804f8" } diff --git a/src/third_party/wiredtiger/src/btree/bt_sync.c b/src/third_party/wiredtiger/src/btree/bt_sync.c index f052547b4a1..1e237009e5d 100644 --- a/src/third_party/wiredtiger/src/btree/bt_sync.c +++ b/src/third_party/wiredtiger/src/btree/bt_sync.c @@ -440,6 +440,22 @@ __wt_sync_file(WT_SESSION_IMPL *session, WT_CACHE_OP syncop) __wt_checkpoint_progress(session, false); } } + + /* + * During normal checkpoints, mark the tree dirty if the btree has modifications that are + * not visible to the checkpoint. There is a drawback in this approach as we compare the + * btree's maximum transaction id with the checkpoint snap_min and it is possible that this + * transaction may be visible to the checkpoint, but still, we mark the tree as dirty if + * there is a long-running transaction in the database. + * + * Do not mark the tree dirty if there is no change to stable timestamp compared to the last + * checkpoint. + */ + if (!btree->modified && !F_ISSET(conn, WT_CONN_RECOVERING | WT_CONN_CLOSING_CHECKPOINT) && + (btree->rec_max_txn >= txn->snap_min || + (conn->txn_global.checkpoint_timestamp != conn->txn_global.last_ckpt_timestamp && + btree->rec_max_timestamp > conn->txn_global.checkpoint_timestamp))) + __wt_tree_modify_set(session); break; case WT_SYNC_CLOSE: case WT_SYNC_DISCARD: diff --git a/src/third_party/wiredtiger/src/reconcile/rec_visibility.c b/src/third_party/wiredtiger/src/reconcile/rec_visibility.c index b2ae3a86048..129e286d43c 100644 --- a/src/third_party/wiredtiger/src/reconcile/rec_visibility.c +++ b/src/third_party/wiredtiger/src/reconcile/rec_visibility.c @@ -536,8 +536,6 @@ __rec_upd_select(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_UPDATE *first_upd */ if (*first_txn_updp == NULL) *first_txn_updp = upd; - if (WT_TXNID_LT(max_txn, txnid)) - max_txn = txnid; /* * Special handling for application threads evicting their own updates. @@ -596,8 +594,6 @@ __rec_upd_select(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_UPDATE *first_upd if (F_ISSET(r, WT_REC_CHECKPOINT)) { *upd_memsizep += WT_UPDATE_MEMSIZE(upd); *has_newer_updatesp = true; - if (upd->start_ts > max_ts) - max_ts = upd->start_ts; seen_prepare = true; continue; } else { @@ -617,14 +613,17 @@ __rec_upd_select(WT_SESSION_IMPL *session, WT_RECONCILE *r, WT_UPDATE *first_upd } } - /* Track the first update with non-zero timestamp. */ - if (upd->start_ts > max_ts) - max_ts = upd->start_ts; - /* Always select the newest committed update to write to disk */ if (upd_select->upd == NULL) upd_select->upd = upd; + /* Track the selected update transaction id and timestamp. */ + if (WT_TXNID_LT(max_txn, txnid)) + max_txn = txnid; + + if (upd->start_ts > max_ts) + max_ts = upd->start_ts; + /* * We only need to walk the whole update chain if we are evicting metadata as it is written * with read uncommitted isolation and we may see a committed update followed by uncommitted diff --git a/src/third_party/wiredtiger/src/reconcile/rec_write.c b/src/third_party/wiredtiger/src/reconcile/rec_write.c index 5bad5df9d9b..f82b4d2cdaf 100644 --- a/src/third_party/wiredtiger/src/reconcile/rec_write.c +++ b/src/third_party/wiredtiger/src/reconcile/rec_write.c @@ -380,6 +380,22 @@ __rec_write_page_status(WT_SESSION_IMPL *session, WT_RECONCILE *r) page = r->page; mod = page->modify; + /* + * Track the page's maximum transaction ID (used to decide if we can evict a clean page and + * discard its history). + */ + mod->rec_max_txn = r->max_txn; + mod->rec_max_timestamp = r->max_ts; + + /* + * Track the tree's maximum transaction ID (used to decide if it's safe to discard the tree) and + * maximum timestamp. + */ + if (WT_TXNID_LT(btree->rec_max_txn, r->max_txn)) + btree->rec_max_txn = r->max_txn; + if (btree->rec_max_timestamp < r->max_ts) + btree->rec_max_timestamp = r->max_ts; + /* * Set the page's status based on whether or not we cleaned the page. */ @@ -406,27 +422,6 @@ __rec_write_page_status(WT_SESSION_IMPL *session, WT_RECONCILE *r) !F_ISSET(r, WT_REC_EVICT) || (F_ISSET(r, WT_REC_HS | WT_REC_IN_MEMORY) || WT_IS_METADATA(btree->dhandle))); } else { - /* - * Track the page's maximum transaction ID (used to decide if we can evict a clean page and - * discard its history). - */ - mod->rec_max_txn = r->max_txn; - mod->rec_max_timestamp = r->max_ts; - - /* - * Track the tree's maximum transaction ID (used to decide if it's safe to discard the - * tree). Reconciliation for eviction is multi-threaded, only update the tree's maximum - * transaction ID when doing a checkpoint. That's sufficient, we only care about the maximum - * transaction ID of current updates in the tree, and checkpoint visits every dirty page in - * the tree. - */ - if (!F_ISSET(r, WT_REC_EVICT)) { - if (WT_TXNID_LT(btree->rec_max_txn, r->max_txn)) - btree->rec_max_txn = r->max_txn; - if (btree->rec_max_timestamp < r->max_ts) - btree->rec_max_timestamp = r->max_ts; - } - /* * We set the page state to mark it as having been dirtied for the first time prior to * reconciliation. A failed atomic cas indicates that an update has taken place during diff --git a/src/third_party/wiredtiger/test/suite/test_alter05.py b/src/third_party/wiredtiger/test/suite/test_alter05.py index 47436361e62..dad5ac8ea81 100644 --- a/src/third_party/wiredtiger/test/suite/test_alter05.py +++ b/src/third_party/wiredtiger/test/suite/test_alter05.py @@ -86,6 +86,7 @@ class test_alter05(TieredConfigMixin, wttest.WiredTigerTestCase): self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(2)) c.close() + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(2)) prev_alter_checkpoints = self.get_stat(wiredtiger.stat.conn.session_table_alter_trigger_checkpoint) # Alter the table and verify. @@ -103,6 +104,7 @@ class test_alter05(TieredConfigMixin, wttest.WiredTigerTestCase): c[k+1] = 2 self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(3)) + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(3)) self.assertRaisesException(wiredtiger.WiredTigerError, lambda: self.session.alter(uri, 'log=(enabled=true)')) self.verify_metadata('log=(enabled=false)') diff --git a/src/third_party/wiredtiger/test/suite/test_checkpoint_snapshot06.py b/src/third_party/wiredtiger/test/suite/test_checkpoint_snapshot06.py new file mode 100644 index 00000000000..69c7ad483d9 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_checkpoint_snapshot06.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import os, shutil, threading, time +from wtthread import checkpoint_thread +from helper import simulate_crash_restart +import wiredtiger, wttest +from wtdataset import SimpleDataSet +from wtscenario import make_scenarios +from helper import copy_wiredtiger_home +from wiredtiger import stat + +# test_checkpoint_snapshot06.py +# This test is to run checkpoint and truncate and insert followed by eviction +# for one table in parallel with timing stress for checkpoint and let eviction +# write more data than checkpoint. +class test_checkpoint_snapshot06(wttest.WiredTigerTestCase): + + # Create two tables. + uri_1 = "table:test_checkpoint_snapshot06_1" + uri_2 = "table:test_checkpoint_snapshot06_2" + backup_dir = "BACKUP" + + format_values = [ + ('column', dict(key_format='r', value_format='S')), + ('column_fix', dict(key_format='r', value_format='8t')), + ('row_integer', dict(key_format='i', value_format='S')), + ] + + restart_values = [ + ("crash_restart", dict(restart=True)), + ("backup", dict(restart=False)), + ] + + scenarios = make_scenarios(format_values, restart_values) + + def conn_config(self): + config = 'cache_size=10MB,statistics=(all),statistics_log=(json,on_close,wait=1),log=(enabled=true),debug_mode=(log_retention=10),timing_stress_for_test=[checkpoint_slow]' + return config + + def moresetup(self): + if self.value_format == '8t': + # Rig to use more than one page; otherwise the inconsistent checkpoint assertions fail. + self.extraconfig = ',leaf_page_max=4096' + self.nrows = 5000 + self.valuea = 97 + self.valueb = 98 + else: + self.extraconfig = '' + self.nrows = 1000 + self.valuea = "aaaaa" * 100 + self.valueb = "bbbbb" * 100 + + def take_full_backup(self, fromdir, todir): + # Open up the backup cursor, and copy the files. Do a full backup. + cursor = self.session.open_cursor('backup:', None, None) + self.pr('Full backup from '+ fromdir + ' to ' + todir + ': ') + os.mkdir(todir) + while True: + ret = cursor.next() + if ret != 0: + break + bkup_file = cursor.get_key() + copy_file = os.path.join(fromdir, bkup_file) + sz = os.path.getsize(copy_file) + self.pr('Copy from: ' + bkup_file + ' (' + str(sz) + ') to ' + todir) + shutil.copy(copy_file, todir) + self.assertEqual(ret, wiredtiger.WT_NOTFOUND) + cursor.close() + + def evict_cursor(self, uri, ds, nrows): + # Configure debug behavior on a cursor to evict the page positioned on when the reset API is used. + evict_cursor = self.session.open_cursor(uri, None, "debug=(release_evict)") + self.session.begin_transaction("ignore_prepare=true") + for i in range (1, nrows + 1): + evict_cursor.set_key(ds.key(i)) + evict_cursor.search() + if i % 10 == 0: + evict_cursor.reset() + evict_cursor.close() + self.session.rollback_transaction() + + def large_updates(self, uri, value, ds, nrows, commit_ts): + # Update a large number of records. + session = self.session + cursor = session.open_cursor(uri) + for i in range(1, nrows+1): + session.begin_transaction() + cursor[ds.key(i)] = value + if commit_ts == 0: + session.commit_transaction() + else: + session.commit_transaction('commit_timestamp=' + self.timestamp_str(commit_ts)) + cursor.close() + + def check(self, check_value, uri, nrows, read_ts, more_invisible_rows_exist): + # In FLCS the existence of the invisible extra set of rows causes the table to + # extend under them. Until that's fixed, expect (not just allow) those rows to + # exist and demand that they read back as zero and not as check_value. When it + # is fixed (so the end of the table updates transactionally) the special-case + # logic can just be removed. + flcs_tolerance = more_invisible_rows_exist and self.value_format == '8t' + + session = self.session + if read_ts == 0: + session.begin_transaction() + else: + session.begin_transaction('read_timestamp=' + self.timestamp_str(read_ts)) + cursor = session.open_cursor(uri) + count = 0 + for k, v in cursor: + if flcs_tolerance and count >= nrows: + self.assertEqual(v, 0) + else: + self.assertEqual(v, check_value) + count += 1 + session.commit_transaction() + self.assertEqual(count, nrows * 2 if flcs_tolerance else nrows) + + def perform_backup_or_crash_restart(self, fromdir, todir): + if self.restart == True: + #Simulate a crash by copying to a new directory(RESTART). + copy_wiredtiger_home(self, fromdir, todir + ".copy", True) + simulate_crash_restart(self, fromdir, todir) + else: + #Take a backup and restore it. + self.take_full_backup(fromdir, todir) + self.take_full_backup(fromdir, todir + ".copy") + self.reopen_conn(todir) + + def test_checkpoint_snapshot(self): + self.moresetup() + + ds_1 = SimpleDataSet(self, self.uri_1, 0, \ + key_format=self.key_format, value_format=self.value_format, \ + config="log=(enabled=true)" +self.extraconfig) + ds_1.populate() + + ds_2 = SimpleDataSet(self, self.uri_2, 0, \ + key_format=self.key_format, value_format=self.value_format, \ + config="log=(enabled=true)" + self.extraconfig) + ds_2.populate() + + # Insert number of records into both tables. + self.large_updates(self.uri_1, self.valuea, ds_1, self.nrows, 0) + self.check(self.valuea, self.uri_1, self.nrows, 0, False) + + self.large_updates(self.uri_2, self.valuea, ds_2, self.nrows, 0) + self.check(self.valuea, self.uri_2, self.nrows, 0, False) + + # Remove one key from both the tables. + cursor1 = self.session.open_cursor(self.uri_1) + cursor2 = self.session.open_cursor(self.uri_2) + + cursor1.set_key(ds_1.key(50)) + cursor2.set_key(ds_2.key(50)) + self.assertEqual(cursor1.remove(), 0) + self.assertEqual(cursor2.remove(), 0) + + # Truncate the range from 1-100 in both tables where key 50 doesn't exist. + # We only set a stop cursor for both tables and send in an empty start + # cursor to truncate from the beginning of the table. + session1 = self.conn.open_session() + session1.begin_transaction() + cursor11 = session1.open_cursor(self.uri_1) + cursor12 = session1.open_cursor(self.uri_2) + + cursor11.set_key(ds_1.key(100)) + cursor12.set_key(ds_2.key(100)) + session1.truncate(None, None, cursor11, None) + session1.truncate(None, None, cursor12, None) + + # Insert the key 50 that was already removed. + session2 = self.conn.open_session() + session2.begin_transaction() + cursor21 = session2.open_cursor(self.uri_1) + cursor22 = session2.open_cursor(self.uri_2) + + cursor21.set_key(ds_1.key(50)) + cursor21.set_value(self.valueb) + cursor22.set_key(ds_2.key(50)) + cursor22.set_value(self.valueb) + + self.assertEqual(cursor21.insert(), 0) + self.assertEqual(cursor22.insert(), 0) + + # Create a checkpoint thread + done = threading.Event() + ckpt = checkpoint_thread(self.conn, done) + try: + ckpt.start() + + # Wait for checkpoint to start and acquire its snapshot before committing. + ckpt_snapshot = 0 + while not ckpt_snapshot: + time.sleep(1) + stat_cursor = self.session.open_cursor('statistics:', None, None) + ckpt_snapshot = stat_cursor[stat.conn.txn_checkpoint_snapshot_acquired][2] + stat_cursor.close() + + # commit the operations in out of order. Insert followed by truncate. + session2.commit_transaction() + session1.commit_transaction() + + # Evict all the modifications of the table1 before checkpoint gets into the table. + self.evict_cursor(self.uri_1, ds_1, self.nrows) + + finally: + done.set() + ckpt.join() + + # Perform an additional checkpoint to ensure table2 also has the latest data. + self.session.checkpoint() + self.perform_backup_or_crash_restart(".", self.backup_dir) + + # Check that the both tables contains the last re-inserted value. + cursor11 = self.session.open_cursor(self.uri_1) + cursor12 = self.session.open_cursor(self.uri_2) + + cursor11.set_key(ds_1.key(50)) + self.assertEqual(cursor11.search(), 0) + self.assertEqual(cursor11.get_value(), self.valueb) + + cursor12.set_key(ds_2.key(50)) + self.assertEqual(cursor12.search(), 0) + self.assertEqual(cursor12.get_value(), self.valueb) + +if __name__ == '__main__': + wttest.run() diff --git a/src/third_party/wiredtiger/test/suite/test_hs21.py b/src/third_party/wiredtiger/test/suite/test_hs21.py index 638823d87d0..6479a6d64b0 100644 --- a/src/third_party/wiredtiger/test/suite/test_hs21.py +++ b/src/third_party/wiredtiger/test/suite/test_hs21.py @@ -154,6 +154,9 @@ class test_hs21(wttest.WiredTigerTestCase): self.check(self.session, value1, ds.uri, self.nrows // 2, 2, flcs_nrows=self.nrows) self.check(self.session, value2, ds.uri, self.nrows, 100) + # Set the stable timestamp to 100 to let checkpoint write all the stable data. + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(100)) + # Our sweep scan interval is every 1 second and the amount of idle time needed for a handle to be closed is 2 seconds. # It should take roughly 3 seconds for the sweep server to close our file handles. Lets wait at least double # that to be safe. -- cgit v1.2.1