summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/test/suite/test_prepare20.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/test/suite/test_prepare20.py')
-rw-r--r--src/third_party/wiredtiger/test/suite/test_prepare20.py303
1 files changed, 303 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/test/suite/test_prepare20.py b/src/third_party/wiredtiger/test/suite/test_prepare20.py
new file mode 100644
index 00000000000..0b541d40730
--- /dev/null
+++ b/src/third_party/wiredtiger/test/suite/test_prepare20.py
@@ -0,0 +1,303 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-present MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+
+# test_prepare19.py
+# Check that we can use an application-level log to replay unstable transactions.
+
+import wiredtiger, wttest
+from wtscenario import make_scenarios
+from helper import simulate_crash_restart
+
+class test_prepare19(wttest.WiredTigerTestCase):
+ # Do write the logs immediately, but don't waste time fsyncing them.
+ conn_config = 'log=(enabled),transaction_sync=(enabled=true,method=none)'
+
+ format_values = [
+ ('integer-row', dict(key_format='i', usestrings=True, value_format='S')),
+ ('column', dict(key_format='r', usestrings=False, value_format='S')),
+ ('column-fix', dict(key_format='r', usestrings=False, value_format='8t')),
+ ]
+ ckpt_values = [
+ ('none', dict(first_ckpt=None, second_ckpt=None)),
+ ('1-21', dict(first_ckpt=21, second_ckpt=None)),
+ ('1-23', dict(first_ckpt=23, second_ckpt=None)),
+ ('1-26', dict(first_ckpt=26, second_ckpt=None)),
+ ('2-21', dict(first_ckpt=None, second_ckpt=21)),
+ ('2-23', dict(first_ckpt=None, second_ckpt=23)),
+ ('2-26', dict(first_ckpt=None, second_ckpt=26)),
+ ('2-31', dict(first_ckpt=None, second_ckpt=31)),
+ ('2-33', dict(first_ckpt=None, second_ckpt=33)),
+ ('2-36', dict(first_ckpt=None, second_ckpt=36)),
+ ]
+ commit_values = [
+ ('commit', dict(commit_before_crash=True)),
+ ('nocommit', dict(commit_before_crash=False)),
+ ]
+ scenarios = make_scenarios(format_values, ckpt_values, commit_values)
+
+ # Application-level log infrastructure. This is real simpleminded and not
+ # expected to do very much other than make sure the basic premise works.
+
+ nullvalue = None
+
+ # Some operation codes for the application log.
+ BEGIN=1
+ WRITE=2
+ PREPARETIME=3
+ PREPARE=4
+ COMMITTIME=5
+ DURABLETIME=6
+ COMMIT=7
+
+ lsession = None
+ lcursor = None
+ lsn = 1
+ txnid = 1
+
+ log_replays = 0
+
+ def log_open(self, log_uri):
+ self.lsession = self.conn.open_session()
+ self.lcursor = self.lsession.open_cursor(log_uri)
+
+ def log_begin(self):
+ self.session.begin_transaction()
+ self.lsession.begin_transaction()
+ self.lcursor[self.lsn] = (self.BEGIN, self.txnid, self.nullvalue, self.nullvalue)
+ self.lsn += 1
+
+ def log_write(self, dcursor, k, newv):
+ oldv = dcursor[k]
+ self.lcursor[self.lsn] = (self.WRITE, k, oldv, newv)
+ self.lsn += 1
+ dcursor[k] = newv
+
+ def log_prepare(self, prepare_ts):
+ self.lcursor[self.lsn] = (self.PREPARETIME, prepare_ts, self.nullvalue, self.nullvalue)
+ self.lsn += 1
+ self.lsession.commit_transaction()
+ self.session.prepare_transaction('prepare_timestamp=' + self.timestamp_str(prepare_ts))
+ self.lsession.begin_transaction()
+ self.lcursor[self.lsn] = (self.PREPARE, self.txnid, self.nullvalue, self.nullvalue)
+ self.lsn += 1
+ self.lsession.commit_transaction()
+
+ def log_precommit(self, commit_ts, durable_ts):
+ self.lsession.begin_transaction()
+ self.lcursor[self.lsn] = (self.COMMITTIME, commit_ts, self.nullvalue, self.nullvalue)
+ self.lsn += 1
+ self.lcursor[self.lsn] = (self.DURABLETIME, durable_ts, self.nullvalue, self.nullvalue)
+ self.lsn += 1
+ self.lsession.commit_transaction()
+
+ def log_commit(self, commit_ts, durable_ts):
+ self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(commit_ts) +
+ ',durable_timestamp=' + self.timestamp_str(durable_ts))
+ self.lsession.begin_transaction()
+ self.lcursor[self.lsn] = (self.COMMIT, self.txnid, self.nullvalue, self.nullvalue)
+ self.lsn += 1
+ self.lsession.commit_transaction()
+ self.txnid += 1
+
+ def log_replay(self, dcursor):
+ stabletime = int('0x' + self.conn.query_timestamp('get=stable_timestamp'), 16)
+ #self.prout("stable {}".format(stabletime))
+ # First pass: find prepared txns. Ignore (thus abort) any that didn't even prepare.
+ txns = {}
+ self.lcursor.reset()
+ for lsn, op, k, oldv, newv in self.lcursor:
+ if op == self.BEGIN:
+ # "key" is the txnid
+ txns[k] = False
+ elif op == self.PREPARE or op == self.COMMIT:
+ # "key" is the txnid
+ txns[k] = True
+ # Second pass: replay values from non-aborted txns.
+ self.lcursor.reset()
+ writing = False
+ committime = None
+ durabletime = None
+ for lsn, op, k, oldv, newv in self.lcursor:
+ if op == self.BEGIN:
+ # "key" is the txnid
+ if txns[k]:
+ self.session.begin_transaction('roundup_timestamps=(prepared=true)')
+ writing = True
+ #self.prout("begin {}".format(k))
+ else:
+ writing = False
+ elif op == self.WRITE:
+ if writing and dcursor[k] == oldv:
+ dcursor[k] = newv
+ self.log_replays += 1
+ #self.prout("write {} {}".format(k, str(newv)[0:2]))
+ elif op == self.PREPARETIME:
+ if writing:
+ preparetime = k
+ #self.prout("preparetime {}".format(k))
+ elif op == self.PREPARE:
+ if writing:
+ self.session.prepare_transaction(
+ 'prepare_timestamp=' + self.timestamp_str(preparetime))
+ #self.prout("prepare {}".format(k))
+ elif op == self.COMMITTIME:
+ if writing:
+ committime = k
+ #self.prout("committime {}".format(k))
+ elif op == self.DURABLETIME:
+ if writing:
+ durabletime = k
+ if durabletime <= stabletime:
+ durabletime = stabletime + 1
+ #self.prout("durabletime {} -> {}".format(k, durabletime))
+ elif op == self.COMMIT:
+ if writing:
+ self.session.commit_transaction(
+ 'commit_timestamp=' + self.timestamp_str(committime) +
+ ',durable_timestamp=' + self.timestamp_str(durabletime))
+ #self.prout("commit {}".format(k))
+ writing = False
+ committime = None
+ durabletime = None
+ else:
+ self.assertTrue(False)
+ # If we have a prepared and uncommitted transaction, commit it.
+ # (Note that we don't handle the case where we write values and don't finish preparing.)
+ if committime is not None and durabletime is not None:
+ self.assertTrue(writing)
+ self.session.commit_transaction(
+ 'commit_timestamp=' + self.timestamp_str(committime) +
+ ',durable_timestamp=' + self.timestamp_str(durabletime))
+ #self.prout("final commit")
+
+ # Now the test.
+
+ def test_prepare19(self):
+ data_uri = 'file:prepare19data'
+ log_uri = 'file:prepare19log'
+
+ # Create one table for data and another to be an application-level log.
+ # The log's format is application-lsn -> operation, key, oldvalue, newvalue
+
+ self.session.create(data_uri, 'log=(enabled=false),key_format={},value_format={}'.format(
+ self.key_format, self.value_format))
+
+ self.session.create(log_uri, 'key_format=r,value_format=ii{}{}'.format(
+ self.value_format, self.value_format))
+
+ nrows = 1000
+ if self.value_format == '8t':
+ value_a = 97
+ value_b = 98
+ value_c = 99
+ self.nullvalue = 255
+ else:
+ value_a = 'aaaaa' * 100
+ value_b = 'bbbbb' * 100
+ value_c = 'ccccc' * 100
+ self.nullvalue = ''
+
+ dcursor = self.session.open_cursor(data_uri)
+
+ # Write some baseline data first and make it stable.
+ self.session.begin_transaction()
+ for i in range(1, nrows + 1):
+ dcursor[i] = value_a
+ self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(10))
+ self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(10))
+ self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(10))
+ self.session.checkpoint()
+
+ self.log_open(log_uri)
+
+ # Write and log a transaction.
+ self.log_begin()
+ for i in range(1, nrows + 1):
+ self.log_write(dcursor, i, value_b)
+ self.log_prepare(20)
+ self.log_precommit(22, 25)
+ self.log_commit(22, 25)
+
+ # Optionally checkpoint here.
+ if self.first_ckpt is not None:
+ self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(self.first_ckpt))
+ self.session.checkpoint()
+
+ # Write and log another transaction.
+ self.log_begin()
+ for i in range(1, nrows + 1):
+ self.log_write(dcursor, i, value_c)
+ self.log_prepare(30)
+
+ # Optionally commit now.
+ self.log_precommit(32, 35)
+ if self.commit_before_crash:
+ self.log_commit(32, 35)
+
+ # Optionally checkpoint here.
+ if self.second_ckpt is not None:
+ self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(self.second_ckpt))
+ # Use the log session to checkpoint: checkpointing from the main session
+ # will fail if we just prepared without committing.
+ self.lsession.checkpoint()
+
+ # Now crash.
+ simulate_crash_restart(self, ".", "RESTART")
+ dcursor = self.session.open_cursor(data_uri)
+ self.log_open(log_uri)
+
+ # Replay the application log.
+ self.log_replay(dcursor)
+
+ # Make everything stable.
+ self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(100))
+ self.session.checkpoint()
+
+ # We should now see all the data.
+ self.session.begin_transaction('read_timestamp=' + self.timestamp_str(15))
+ for i in range(1, nrows + 1):
+ self.assertEqual(dcursor[i], value_a)
+ self.session.rollback_transaction()
+ self.session.begin_transaction('read_timestamp=' + self.timestamp_str(25))
+ for i in range(1, nrows + 1):
+ self.assertEqual(dcursor[i], value_b)
+ self.session.rollback_transaction()
+ self.session.begin_transaction('read_timestamp=' + self.timestamp_str(35))
+ for i in range(1, nrows + 1):
+ self.assertEqual(dcursor[i], value_c)
+ self.session.rollback_transaction()
+
+ # Assert we replayed the right number of writes from the application log.
+ ckpt = self.first_ckpt if self.second_ckpt is None else self.second_ckpt
+ if ckpt is None or ckpt < 25:
+ self.assertEqual(self.log_replays, nrows * 2)
+ elif ckpt < 35 or not self.commit_before_crash:
+ self.assertEqual(self.log_replays, nrows)
+ else:
+ self.assertEqual(self.log_replays, 0)