summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/test/suite/test_util18.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/test/suite/test_util18.py')
-rw-r--r--src/third_party/wiredtiger/test/suite/test_util18.py194
1 files changed, 194 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/test/suite/test_util18.py b/src/third_party/wiredtiger/test/suite/test_util18.py
new file mode 100644
index 00000000000..c66bfca13b0
--- /dev/null
+++ b/src/third_party/wiredtiger/test/suite/test_util18.py
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2021 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+import codecs, filecmp
+from suite_subprocess import suite_subprocess
+import wiredtiger, wttest
+from wtscenario import make_scenarios
+
+# test_util18.py
+# Utilities: wt printlog
+class test_util18(wttest.WiredTigerTestCase, suite_subprocess):
+ tablename = 'test_util18.a'
+ uri = 'table:' + tablename
+ logmax = 100
+ nentries = 5
+ create_params = 'key_format=S,value_format=S'
+ key_prefix = 'KEY'
+ val_prefix = 'VAL'
+
+ # Whether user data is redacted or printed.
+ print_user_data = [
+ ('show_user_data', dict(print_user_data=True)),
+ ('no_user_data', dict(print_user_data=False)),
+ ]
+
+ scenarios = make_scenarios(print_user_data)
+
+ def conn_config(self):
+ return 'log=(archive=false,enabled,file_max=%dK)' % self.logmax
+
+ # Populate our test table with data we can check against in the printlog output.
+ def populate(self):
+ cursor = self.session.open_cursor(self.uri, None)
+ for i in range(0, self.nentries):
+ key = self.key_prefix + str(i)
+ val = self.val_prefix + str(i)
+ cursor[key] = val
+ cursor.close()
+
+ # Check the given printlog file reflects the data written by 'populate'.
+ def check_populated_printlog(self, log_file, expect_keyval, expect_keyval_hex):
+ for i in range(0, self.nentries):
+ key = self.key_prefix + str(i)
+ val = self.val_prefix + str(i)
+ # Check if the KEY/VAL commits exist in the log file.
+ if expect_keyval:
+ self.check_file_contains(log_file, '"key": "%s\\u0000"' % key)
+ self.check_file_contains(log_file, '"value": "%s\\u0000"' % val)
+ else:
+ self.check_file_not_contains(log_file, '"key": "%s\\u0000"' % key)
+ self.check_file_not_contains(log_file, '"value": "%s\\u0000"' % val)
+
+ # Convert our KEY/VAL strings to their expected hex value.
+ hex_key = codecs.encode(key.encode(), 'hex')
+ val_key = codecs.encode(val.encode(), 'hex')
+ # Check if the KEY/VAL commits exist in the log file (in hex form).
+ if expect_keyval_hex:
+ self.check_file_contains(log_file, '"key-hex": "%s00"' % str(hex_key, 'ascii'))
+ self.check_file_contains(log_file, '"value-hex": "%s00"' % str(val_key, 'ascii'))
+ else:
+ self.check_file_not_contains(log_file, '"key-hex": "%s00"' % str(hex_key, 'ascii'))
+ self.check_file_not_contains(log_file, '"value-hex": "%s00"' % str(val_key, 'ascii'))
+
+ def test_printlog_file(self):
+ """
+ Run printlog on a populated table.
+ """
+ self.session.create('table:' + self.tablename, self.create_params)
+ self.populate()
+ wt_args = ["printlog"]
+ # Append "-u" if we expect printlog to print user data.
+ if self.print_user_data:
+ wt_args.append("-u")
+ self.runWt(wt_args, outfilename='printlog.out')
+ self.check_non_empty_file('printlog.out')
+ self.check_populated_printlog('printlog.out', self.print_user_data, False)
+
+ def test_printlog_hex_file(self):
+ """
+ Run printlog with hexadecimal formatting on a populated table.
+ """
+ self.session.create('table:' + self.tablename, self.create_params)
+ self.populate()
+ wt_args = ["printlog", "-x"]
+ # Append "-u" if we expect printlog to print user data.
+ if self.print_user_data:
+ wt_args.append("-u")
+ self.runWt(wt_args, outfilename='printlog-hex.out')
+ self.check_non_empty_file('printlog-hex.out')
+ self.check_populated_printlog('printlog-hex.out', self.print_user_data, self.print_user_data)
+
+ def test_printlog_message(self):
+ """
+ Run printlog with messages-only formatting on a populated table.
+ """
+ self.session.create('table:' + self.tablename, self.create_params)
+ self.populate()
+ # Write a log message that we can specifically test the presence of.
+ log_message = "Test Message: %s" % self.tablename
+ self.session.log_printf(log_message)
+ wt_args = ["printlog", "-m"]
+ # Append "-u" if we expect printlog to print user data.
+ if self.print_user_data:
+ wt_args.append("-u")
+ self.runWt(wt_args, outfilename='printlog-message.out')
+ self.check_non_empty_file('printlog-message.out')
+ self.check_file_contains('printlog-message.out', log_message)
+ self.check_populated_printlog('printlog-message.out', False, False)
+
+ def test_printlog_lsn_offset(self):
+ """
+ Run printlog with an LSN offset provided.
+ """
+ self.session.create('table:' + self.tablename, self.create_params)
+ self.populate()
+
+ # Open a log cursor to accurately extract the first, second and last LSN from our
+ # log.
+ c = self.session.open_cursor("log:", None, None)
+ # Moving the cursor to the beginning of the file, extract our first LSN.
+ c.next()
+ first_lsn_keys = c.get_key()
+ # Moving the cursor, extract our second LSN.
+ c.next()
+ second_lsn_keys = c.get_key()
+ last_lsn_keys = []
+ # Moving the cursor to the last available key, extract the last LSN value.
+ while c.next() == 0:
+ last_lsn_keys = c.get_key()
+ c.next()
+ c.close()
+
+ # Construct the first, second and last LSN values, assuming the
+ # key elements follow the following sequence: [lsn.file, lsn.offset, opcount].
+ first_lsn = '%s,%s' % (first_lsn_keys[0], first_lsn_keys[1])
+ second_lsn = '%s,%s' % (second_lsn_keys[0], second_lsn_keys[1])
+ last_lsn = '%s,%s' % (last_lsn_keys[0], last_lsn_keys[1])
+
+ # Test printlog on a bounded range that starts and ends on our first LSN record. In doing so we want
+ # to assert that other log records won't be printed e.g. the second LSN record.
+ wt_args = ["printlog", '-l %s,%s' % (first_lsn, first_lsn)]
+ self.runWt(wt_args, outfilename='printlog-lsn-offset.out')
+ self.check_file_contains('printlog-lsn-offset.out', '"lsn" : [%s]' % first_lsn)
+ self.check_file_not_contains('printlog-lsn-offset.out', '"lsn" : [%s]' % second_lsn)
+ self.check_populated_printlog('printlog-lsn-offset.out', False, False)
+
+ # Test printlog from the starting LSN value to the end of the log. We expect to find the logs relating
+ # to the population of our table.
+ wt_args = ["printlog", '-l %s' % first_lsn]
+ # Append "-u" if we expect printlog to print user data.
+ if self.print_user_data:
+ wt_args.append("-u")
+ self.runWt(wt_args, outfilename='printlog-lsn-offset.out')
+ self.check_populated_printlog('printlog-lsn-offset.out', self.print_user_data, False)
+
+ # Test that using LSN '1,0' and our first LSN value produce the same output when passed to printlog.
+ # We expect printing from LSN '1,0' (which should denote to the beginning of the first log file)
+ # is equivalent to printing from our first extracted LSN value to the last LSN value.
+ wt_args_beginning = ["printlog", '-l 1,0,%s' % last_lsn]
+ wt_args_first = ["printlog", '-l %s,%s' % (first_lsn, last_lsn)]
+ if self.print_user_data:
+ wt_args_beginning.append("-u")
+ wt_args_first.append("-u")
+ self.runWt(wt_args_beginning, outfilename='printlog-lsn-offset-beginning.out')
+ self.runWt(wt_args_first, outfilename='printlog-lsn-offset-first.out')
+ self.assertTrue(filecmp.cmp('printlog-lsn-offset-beginning.out', 'printlog-lsn-offset-first.out'))
+
+if __name__ == '__main__':
+ wttest.run()