summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDon Anderson <dda@mongodb.com>2017-06-08 11:26:34 -0400
committerAlex Gorrod <alexander.gorrod@mongodb.com>2017-06-08 11:26:34 -0400
commit03e6b4f73c2f06aeb57b04bf1063986b2c8ad4d0 (patch)
treef204834de0237c49524306808c012dcc3b54ba51
parent3fcbbb370ba09e6a95bd303d1984c48446efd2c2 (diff)
downloadmongo-03e6b4f73c2f06aeb57b04bf1063986b2c8ad4d0.tar.gz
WT-3326 Enhance workload generator to support wtperf config files (#3433)
Added wtperf.py script to run wtperf files. Added some supporting functions in the runner module to used by programs created by this tool.
-rw-r--r--bench/workgen/runner/runner/__init__.py2
-rw-r--r--bench/workgen/runner/runner/core.py104
-rw-r--r--bench/workgen/wtperf.py440
3 files changed, 543 insertions, 3 deletions
diff --git a/bench/workgen/runner/runner/__init__.py b/bench/workgen/runner/runner/__init__.py
index 67b547bc51b..ed21fffe8dc 100644
--- a/bench/workgen/runner/runner/__init__.py
+++ b/bench/workgen/runner/runner/__init__.py
@@ -88,5 +88,5 @@ except:
shutil.rmtree('WT_TEST', True)
os.mkdir('WT_TEST')
-from .core import txn, extensions_config
+from .core import txn, extensions_config, op_group_transaction, op_log_like, op_multi_table
from .latency import workload_latency
diff --git a/bench/workgen/runner/runner/core.py b/bench/workgen/runner/runner/core.py
index a0f0d4d77cd..2c8311c4ca7 100644
--- a/bench/workgen/runner/runner/core.py
+++ b/bench/workgen/runner/runner/core.py
@@ -29,12 +29,12 @@
# runner/core.py
# Core functions available to all runners
import glob, os
-import workgen
+from workgen import Key, Operation, OpList, Table, Transaction, Value
# txn --
# Put the operation (and any suboperations) within a transaction.
def txn(op, config=None):
- t = workgen.Transaction(config)
+ t = Transaction(config)
op._transaction = t
return op
@@ -99,3 +99,103 @@ def extensions_config(exts):
if len(extfiles) != 0:
result = ',extensions=[' + ','.join(extfiles.values()) + ']'
return result
+
+def _op_multi_table_as_list(ops_arg, tables):
+ result = []
+ if ops_arg._optype != Operation.OP_NONE:
+ for table in tables:
+ result.append(Operation(ops_arg._optype, table, ops_arg._key, ops_arg._value))
+ else:
+ for op in ops._group:
+ result.extend(_op_multi_table_as_list(op, tables))
+ return result
+
+# A convenient way to build a list of operations
+def op_append(op1, op2):
+ if op1 == None:
+ op1 = op2
+ else:
+ op1 += op2
+ return op1
+
+# Emulate wtperf's table_count option. Spread the given operations over
+# a set of tables.
+def op_multi_table(ops_arg, tables):
+ ops = None
+ for op in _op_multi_table_as_list(ops_arg, tables):
+ ops = op_append(ops, op)
+ return ops
+
+# should be 8 bytes format 'Q'
+_logkey = Key(Key.KEYGEN_APPEND, 8)
+def _op_log_op(op, log_table):
+ keysize = op._key._size
+ if keysize == 0:
+ keysize = op._table.options.key_size
+ valuesize = op._value._size
+ if valuesize == 0:
+ valuesize = op._table.options.value_size
+ v = Value(keysize + valuesize)
+ return Operation(Operation.OP_INSERT, log_table, _logkey, v)
+
+def _optype_is_write(optype):
+ return optype == Operation.OP_INSERT or optype == Operation.OP_UPDATE or \
+ optype == Operation.OP_REMOVE
+
+# Emulate wtperf's log_like option. For all operations, add a second
+# insert operation going to a log table.
+def op_log_like(op, log_table, ops_per_txn):
+ if op._optype != Operation.OP_NONE:
+ if _optype_is_write(op._optype):
+ op += _op_log_op(op, log_table)
+ if ops_per_txn == 0:
+ op = txn(op) # txn for each action.
+ else:
+ oplist = []
+ for op2 in op._group:
+ if op2._optype == Operation.OP_NONE:
+ oplist.append(op_log_like(op2, log_table))
+ elif ops_per_txn == 0 and _optype_is_write(op2._optype):
+ op2 += _op_log_op(op2, log_table)
+ oplist.append(txn(op2)) # txn for each action.
+ else:
+ oplist.append(op2)
+ if _optype_is_write(op2._optype):
+ oplist.append(_op_log_op(op2, log_table))
+ op._group = OpList(oplist)
+ return op
+
+def _op_transaction_list(oplist, txn_config):
+ result = None
+ for op in oplist:
+ result = op_append(result, op)
+ return txn(result, txn_config)
+
+# Emulate wtperf's ops_per_txn option. Create transactions around
+# groups of operations of the indicated size.
+def op_group_transaction(ops_arg, ops_per_txn, txn_config):
+ if ops_arg != Operation.OP_NONE:
+ return txn(ops_arg, txn_config)
+ if ops_arg._transaction != None:
+ raise Exception('nested transactions not supported')
+ if ops_arg._repeatgroup != None:
+ raise Exception('grouping transactions with multipliers not supported')
+
+ oplist = []
+ ops = None
+ nops = 0
+ txgroup = []
+ for op in ops_arg._group:
+ if op.optype == Operation.OP_NONE:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ txgroup = []
+ oplist.append(op)
+ else:
+ txgroup.append(op)
+ if len(txgroup) >= ops_per_txn:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ txgroup = []
+ if len(txgroup) > 0:
+ oplist.append(_op_transaction_list(txgroup, txn_config))
+ ops_arg._group = OpList(oplist)
+ return ops_arg
diff --git a/bench/workgen/wtperf.py b/bench/workgen/wtperf.py
new file mode 100644
index 00000000000..3a196fe7b57
--- /dev/null
+++ b/bench/workgen/wtperf.py
@@ -0,0 +1,440 @@
+#!/usr/bin/env python
+#
+# Public Domain 2014-2017 MongoDB, Inc.
+# Public Domain 2008-2014 WiredTiger, Inc.
+#
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+
+# wtperf.py
+# A partial emulation of wtperf. Translates a .wtperf file into a Python
+# script that uses the workgen module, and runs the script. Errors are
+# issued for any .wtperf directives that are not known.
+# See also the usage() function.
+#
+from __future__ import print_function
+import os, sys, tempfile
+
+def eprint(*args, **kwargs):
+ print(*args, file=sys.stderr, **kwargs)
+
+class OptionValue:
+ def __init__(self, value, filename, linenum):
+ self.value = value
+ self.filename = filename
+ self.linenum = linenum
+
+class TranslateException(Exception):
+ pass
+
+class Options(object):
+ pass
+
+class Translator:
+ def __init__(self, filename, prefix, verbose):
+ self.filename = filename
+ self.prefix = prefix
+ self.verbose = verbose
+ self.linenum = 0
+ self.opts = {}
+ self.used_opts = {}
+ self.has_error = False
+
+ def error_file_line(self, fname, linenum, msg):
+ self.has_error = True
+ eprint(fname + ':' + str(linenum) + ': error: ' + msg)
+
+ # Report an error and continue
+ def error(self, msg):
+ self.error_file_line(self.filename, self.linenum, msg)
+
+ # Report an error and unwind the stack
+ def fatal_error(self, msg, errtype):
+ self.error(msg)
+ raise TranslateException(errtype)
+
+ supported_opt_list = [ 'compression', 'conn_config', 'icount',
+ 'key_sz', 'log_like_table',
+ 'populate_ops_per_txn', 'populate_threads',
+ 'reopen_connection',
+ 'table_config', 'table_count',
+ 'threads', 'transaction_config', 'value_sz' ]
+
+ def set_opt(self, optname, val):
+ if optname not in self.supported_opt_list:
+ self.error("unknown option: " + optname)
+ return
+ elif val[0] == '"' and val[-1] == '"':
+ v = val[1:-1]
+ elif val == 'true':
+ v = True
+ elif val == 'false':
+ v = False
+ elif val[0] == '(':
+ v = val # config string stored as is
+ else:
+ try:
+ v = int(val) # it might be an integer
+ except ValueError:
+ v = val # it's a string after all
+ self.opts[optname] = OptionValue(v, self.filename, self.linenum)
+
+ def get_opt(self, optname, dfault):
+ if optname in self.opts:
+ ret = self.opts[optname]
+ self.filename = ret.filename
+ self.linenum = ret.linenum
+ self.used_opts[optname] = 1
+ return ret.value
+ else:
+ return dfault
+
+ def get_int_opt(self, optname, dfault):
+ return self.get_opt(optname, dfault) + 0
+
+ def get_boolean_opt(self, optname, dfault):
+ return not not self.get_opt(optname, dfault)
+
+ # Split a string 'left_side=right_side' into two parts
+ def split_assign(self, s):
+ equalpos = s.find('=')
+ if equalpos < 0:
+ self.error("missing '=' for line: " + line)
+ return (None, None)
+ else:
+ return s.split('=', 1)
+
+ # Split a config string honoring nesting e.g.
+ # "(abc=123,def=234,ghi=(hi=1,bye=2))" would return 3 items.
+ def split_config_parens(self, s):
+ if s[0:1] != '(':
+ import pdb
+ pdb.set_trace()
+ self.fatal_error('missing left paren', 'config parse error')
+ if s[-1:] != ')':
+ self.fatal_error('missing right paren', 'config parse error')
+ s = s[1:-1]
+ result = []
+ level = 0
+ cur = ''
+ for ch in s:
+ if ch == ',' and level == 0:
+ result.append(cur)
+ cur = ''
+ else:
+ cur += ch
+ if ch == '(':
+ level += 1
+ elif ch == ')':
+ level -= 1
+ if level < 0:
+ self.fatal_error('unbalanced paren', 'config parse error')
+ if level != 0:
+ self.fatal_error('unbalanced paren', 'config parse error')
+ if len(cur) != 0:
+ result.append(cur)
+ return result
+
+ def assign_str(self, left, right):
+ return left + '=' + str(right) + '\n'
+
+ def add_operation_str(self, count, opname, multi):
+ result = ''
+ tablename = 'tables[0]' if multi else 'table'
+ if count > 1:
+ result += str(count) + ' * '
+ if count > 0:
+ result += 'Operation(Operation.' + opname + ', ' + \
+ tablename + ') + \\\n'
+ result += ' '
+ return result
+
+ # Wtperf's throttle is based on the number of regular operations,
+ # not including log_like operations. Workgen counts all operations,
+ # it doesn't treat log operations any differently. Adjust the throttle
+ # number to account for the difference.
+ def calc_throttle(self, thread_opts, log_like_table):
+ throttle = thread_opts.throttle
+ if not log_like_table:
+ return (throttle, '')
+ modify = thread_opts.inserts + thread_opts.updates
+ regular = modify + thread_opts.reads
+ total = regular + modify
+ factor = (total + 0.0) / regular
+ new_throttle = int(throttle * factor)
+ if new_throttle == throttle:
+ comment = ''
+ else:
+ comment = '# wtperf throttle=' + str(throttle) + ' adjusted by ' + \
+ str(factor) + ' to compensate for log_like operations.\n'
+ return (new_throttle, comment)
+
+ def parse_threads(self, threads_config):
+ tdecls = ''
+ tlist = self.split_config_parens(threads_config)
+ table_count = self.get_int_opt('table_count', 1)
+ log_like_table = self.get_boolean_opt('log_like_table', False)
+ txn_config = self.get_opt('transaction_config', '')
+ if log_like_table:
+ tdecls += 'log_name = "table:log"\n'
+ tdecls += 's.create(log_name, "key_format=S,value_format=S," +' + \
+ ' compress_table_config)\n'
+ tdecls += 'log_table = Table(log_name)\n\n'
+ thread_count = 0
+ tnames = ''
+ multi = (table_count > 1)
+ for t in tlist:
+ thread_name = 'thread' + str(thread_count)
+ thread_count += 1
+
+ # For wtperf compatibility, we allow both 'insert/inserts' etc.
+ topts = Options()
+ topts.count = 1
+ topts.insert = 0
+ topts.inserts = 0
+ topts.ops_per_txn = 0
+ topts.read = 0
+ topts.reads = 0
+ topts.throttle = 0
+ topts.update = 0
+ topts.updates = 0
+
+ for o in self.split_config_parens(t):
+ (k, v) = self.split_assign(o)
+ if hasattr(topts, k):
+ try:
+ setattr(topts, k, int(v))
+ except ValueError:
+ self.error('thread option ' + k + ': integer expected')
+ else:
+ self.error('unknown thread option: ' + k)
+
+ topts.inserts += topts.insert; topts.insert = 0
+ topts.updates += topts.update; topts.update = 0
+ topts.reads += topts.read; topts.read = 0
+ if topts.count == 0:
+ continue
+
+ if topts.inserts + topts.reads + topts.updates == 0:
+ self.fatal_error('need read/insert/update/...',
+ 'thread config error')
+ tdecls += 'ops = '
+ tdecls += self.add_operation_str(topts.inserts, 'OP_INSERT', multi)
+ tdecls += self.add_operation_str(topts.reads, 'OP_SEARCH', multi)
+ tdecls += self.add_operation_str(topts.updates, 'OP_UPDATE', multi)
+ tdecls = tdecls.rstrip(' \n\\+') + '\n'
+ if multi:
+ tdecls += 'ops = op_multi_table(ops, tables)\n'
+ if topts.ops_per_txn > 0:
+ tdecls += 'ops = op_group_transaction(ops, ' + \
+ str(topts.ops_per_txn) + ', "' + txn_config + '")\n'
+ if log_like_table:
+ tdecls += 'ops = op_log_like(ops, log_table, ' + \
+ str(topts.ops_per_txn) + ')\n'
+ tdecls += thread_name + ' = Thread(ops)\n'
+ if topts.throttle > 0:
+ (throttle, comment) = self.calc_throttle(topts, log_like_table)
+ tdecls += comment
+ tdecls += self.assign_str(thread_name + '.options.throttle',
+ throttle)
+ tdecls += '\n'
+ if topts.count > 1:
+ tnames += str(topts.count) + ' * '
+ tnames += thread_name + ' + '
+
+ tnames = tnames.rstrip(' +')
+ return (tdecls, tnames)
+
+ def translate(self):
+ try:
+ return self.translate_inner()
+ except TranslateException:
+ # An error has already been reported
+ return None
+
+ def translate_inner(self):
+ workloadopts = ''
+ with open(self.filename) as fin:
+ for line in fin:
+ self.linenum += 1
+ commentpos = line.find('#')
+ if commentpos >= 0:
+ line = line[0:commentpos]
+ line = line.strip()
+ if len(line) == 0:
+ continue
+ (key, val) = self.split_assign(line)
+ if key in [ 'max_latency', 'report_file', 'report_interval',
+ 'run_time', 'sample_interval', 'sample_rate' ]:
+ workloadopts += 'workload.options.' + key + '=' + val + '\n'
+ else:
+ self.set_opt(key, val)
+
+ table_count = self.get_int_opt('table_count', 1)
+ conn_config = self.get_opt('conn_config', '')
+ table_config = self.get_opt('table_config', '')
+ key_sz = self.get_int_opt('key_sz', 20)
+ value_sz = self.get_int_opt('value_sz', 100)
+ reopen = self.get_boolean_opt('reopen_connection', False)
+ compression = self.get_opt('compression', '')
+ txn_config = self.get_opt('transaction_config', '')
+
+ s = '#/usr/bin/env python\n'
+ s += '# generated from ' + self.filename + '\n'
+ s += self.prefix
+ s += 'from runner import *\n'
+ s += 'from wiredtiger import *\n'
+ s += 'from workgen import *\n'
+ s += '\n'
+ s += 'context = Context()\n'
+ s += 'conn_config = "' + conn_config + '"\n'
+ if compression != '':
+ s += 'conn_config += extensions_config(["compressors/' + \
+ compression + '"])\n'
+ compression = 'block_compressor=' + compression + ','
+ s += 'conn = wiredtiger_open("WT_TEST", "create," + conn_config)\n'
+ s += 's = conn.open_session()\n'
+ s += '\n'
+ s += 'wtperf_table_config = "key_format=S,value_format=S,type=lsm," +\\\n'
+ s += ' "exclusive=true,allocation_size=4kb," +\\\n'
+ s += ' "internal_page_max=64kb,leaf_page_max=4kb,split_pct=100,"\n'
+ s += 'compress_table_config = "' + compression + '"\n'
+ s += 'table_config = "' + table_config + '"\n'
+ if table_count == 1:
+ s += 'tname = "file:test.wt"\n'
+ s += 's.create(tname, wtperf_table_config +\\\n'
+ s += ' compress_table_config + table_config)\n'
+ s += 'table = Table(tname)\n'
+ s += 'table.options.key_size = ' + str(key_sz) + '\n'
+ s += 'table.options.value_size = ' + str(value_sz) + '\n'
+ else:
+ s += 'table_count = ' + str(table_count) + '\n'
+ s += 'tables = []\n'
+ s += 'for i in range(0, table_count):\n'
+ s += ' tname = "file:test" + str(i) + ".wt"\n'
+ s += ' s.create(tname, ' + \
+ 'wtperf_table_config + ' + \
+ 'compress_table_config + table_config)\n'
+ s += ' t = Table(tname)\n'
+ s += ' t.options.key_size = ' + str(key_sz) + '\n'
+ s += ' t.options.value_size = ' + str(value_sz) + '\n'
+ s += ' tables.append(t)\n'
+ s += '\n'
+
+ icount = self.get_int_opt('icount', 0)
+ pop_thread = self.get_int_opt('populate_threads', 1)
+ pop_per_txn = self.get_int_opt('populate_ops_per_txn', 0)
+ if icount != 0:
+ if pop_thread == 0:
+ self.fatal_error('icount != 0 and populate_threads == 0: ' +\
+ 'cannot populate entries with no threads')
+ elif pop_thread == 1:
+ mult = ''
+ else:
+ mult = str(pop_thread) + ' * '
+
+ # if there are multiple tables to be filled during populate,
+ # the icount is split between them all.
+ nops_per_thread = icount / (pop_thread * table_count)
+ if table_count == 1:
+ s += 'pop_ops = Operation(Operation.OP_INSERT, table)\n'
+ else:
+ s += 'pop_ops = Operation(Operation.OP_INSERT, tables[0])\n'
+ s += 'pop_ops = op_multi_table(pop_ops, tables)\n'
+ if pop_per_txn > 0:
+ s += 'pop_ops = op_group_transaction(pop_ops, ' + \
+ str(pop_per_txn) + ', "' + txn_config + '")\n'
+ s += 'pop_thread = Thread(pop_ops * ' + str(nops_per_thread) + ')\n'
+ s += 'pop_workload = Workload(context, ' + mult + 'pop_thread)\n'
+ if self.verbose > 0:
+ s += 'print("populate:")\n'
+ s += 'pop_workload.run(conn)\n'
+ else:
+ if self.get_int_opt('populate_threads', 0) != 0:
+ self.error("populate_threads > 0, icount == 0")
+
+ thread_config = self.get_opt('threads', '')
+ if thread_config != '':
+ (t_create, t_var) = self.parse_threads(thread_config)
+ s += '\n' + t_create
+ if reopen:
+ s += '\n# reopen the connection\n'
+ s += 'conn.close()\n'
+ s += 'conn = wiredtiger_open(' + \
+ '"WT_TEST", "create," + conn_config)\n'
+ s += '\n'
+ s += 'workload = Workload(context, ' + t_var + ')\n'
+ s += workloadopts
+ if self.verbose > 0:
+ s += 'print("workload:")\n'
+ s += 'workload.run(conn)\n'
+
+ for o in self.used_opts:
+ del self.opts[o]
+ if len(self.opts) != 0:
+ self.error('internal error, options not handled: ' + str(self.opts))
+ return s
+
+def usage():
+ eprint((
+ 'Usage: python wtperf.py [ options ] file.wtperf ...\n'
+ '\n'
+ 'Options:\n'
+ ' --python Python output generated on stdout\n'
+ ' -v --verbose Verbose output\n'
+ '\n'
+ 'If --python is not specified, the resulting workload is run.'))
+
+verbose = 0
+py_out = False
+workgen_dir = os.path.dirname(os.path.abspath(__file__))
+runner_dir = os.path.join(workgen_dir, 'runner')
+prefix = (
+ '# The next lines are unneeded if this script is in the runner directory.\n'
+ 'import sys\n'
+ 'sys.path.append("' + runner_dir + '")\n\n')
+
+exit_status = 0
+for arg in sys.argv[1:]:
+ if arg == '--python':
+ py_out = True
+ elif arg == '--verbose' or arg == '-v':
+ verbose += 1
+ elif arg.endswith('.wtperf'):
+ translator = Translator(arg, prefix, verbose)
+ pysrc = translator.translate()
+ if translator.has_error:
+ exit_status = 1
+ elif py_out:
+ print(pysrc)
+ else:
+ (outfd, tmpfile) = tempfile.mkstemp(suffix='.py')
+ os.write(outfd, pysrc)
+ os.close(outfd)
+ execfile(tmpfile)
+ os.remove(tmpfile)
+ else:
+ usage()
+ sys.exit(1)
+sys.exit(exit_status)